flink有需要默认的输入源,也可以自定义数据源,包括自定义的串行和并行。
- 串行数据源
package cn.qz.source;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySourceFunction implements SourceFunction<String> {
private volatile boolean isRunning = true;
public void run(SourceContext<String> ctx) throws Exception {
log.info("cn.qz.source.MySourceFunction.run start");
for (int index = 0; index < 100 && isRunning; index++) {
Thread.sleep(2 * 1000);
ctx.collect(RandomStringUtils.randomAlphanumeric(4));
}
log.info("cn.qz.source.MySourceFunction.run end");
}
/**
* 界面点取消任务的时候执行
*/
public void cancel() {
isRunning = false;
log.info("cn.qz.source.MySourceFunction.cancel");
}
}
测试类:
package cn.qz.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CustomSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
executionEnvironment.setParallelism(1);
DataStreamSource<String> stringDataStreamSource = executionEnvironment.addSource(new MySourceFunction());
stringDataStreamSource.print();
executionEnvironment.execute();
}
}
- 并行数据源
数据源:
package cn.qz.source;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.collection.Parallel;
public class MySourceFunction2 implements ParallelSourceFunction<User> {
private volatile boolean isRunning = true;
public void run(SourceContext<User> ctx) throws Exception {
log.info("cn.qz.source.MySourceFunction2.run start");
for (int index = 0; index < 3 && isRunning; index++) {
String s = RandomStringUtils.randomAlphabetic(4);
User user = new User(s, s, 100 + index);
ctx.collect(user);
}
log.info("cn.qz.source.MySourceFunction2.run end");
}
public void cancel() {
log.info("cn.qz.source.MySourceFunction2.run cancel");
isRunning = false;
}
}
测试类:
package cn.qz.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CustomSource2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
executionEnvironment.setParallelism(1);
DataStreamSource<User> stringDataStreamSource = executionEnvironment.addSource(new MySourceFunction2());
stringDataStreamSource.setParallelism(3);
stringDataStreamSource.print();
executionEnvironment.execute();
}
}