0
点赞
收藏
分享

微信扫一扫

Flink自定义输入源算子

一天清晨 2022-06-24 阅读 82

flink有需要默认的输入源,也可以自定义数据源,包括自定义的串行和并行。

  1. 串行数据源
package cn.qz.source;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

@Slf4j
public class MySourceFunction implements SourceFunction<String> {

private volatile boolean isRunning = true;

@Override
public void run(SourceContext<String> ctx) throws Exception {
log.info("cn.qz.source.MySourceFunction.run start");
for (int index = 0; index < 100 && isRunning; index++) {
Thread.sleep(2 * 1000);
ctx.collect(RandomStringUtils.randomAlphanumeric(4));
}
log.info("cn.qz.source.MySourceFunction.run end");
}

/**
* 界面点取消任务的时候执行
*/
@Override
public void cancel() {
isRunning = false;
log.info("cn.qz.source.MySourceFunction.cancel");
}
}

测试类:

package cn.qz.source;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CustomSource {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
executionEnvironment.setParallelism(1);

DataStreamSource<String> stringDataStreamSource = executionEnvironment.addSource(new MySourceFunction());
stringDataStreamSource.print();

executionEnvironment.execute();
}
}
  1. 并行数据源

数据源:

package cn.qz.source;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.collection.Parallel;

@Slf4j
public class MySourceFunction2 implements ParallelSourceFunction<User> {

private volatile boolean isRunning = true;


@Override
public void run(SourceContext<User> ctx) throws Exception {
log.info("cn.qz.source.MySourceFunction2.run start");
for (int index = 0; index < 3 && isRunning; index++) {
String s = RandomStringUtils.randomAlphabetic(4);
User user = new User(s, s, 100 + index);
ctx.collect(user);
}
log.info("cn.qz.source.MySourceFunction2.run end");
}

@Override
public void cancel() {
log.info("cn.qz.source.MySourceFunction2.run cancel");
isRunning = false;
}
}

测试类:

package cn.qz.source;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CustomSource2 {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
executionEnvironment.setParallelism(1);

DataStreamSource<User> stringDataStreamSource = executionEnvironment.addSource(new MySourceFunction2());
stringDataStreamSource.setParallelism(3);
stringDataStreamSource.print();

executionEnvironment.execute();
}
}




举报

相关推荐

0 条评论