引入jar
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<!-- flink整合kafka_2.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
二、处理逻辑
//2、定义环境 => Env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(9);
env.enableCheckpointing(1000);
FlinkKafkaConsumer<String> consumer = this.getConsumer();//调用下面的方法获取数据源
consumer.setStartFromLatest();//消费最新数据
//2、绑定数据源=> resource
DataStream<String> stream = env.addSource(consumer);
//3、批量读取的方法=>
stream.timeWindowAll(Time.milliseconds(500)) //timeWindowAll:时间滚动窗口,滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠
.apply(new ReadKafkaFlinkWindowFunction())//使用自己定义的apply来收集
.addSink(new KafkaBatchSink());//批量的sink方法
env.execute();
2、定义消费者,并且将消费者consumer转成FlinkKafkaConsumer
public FlinkKafkaConsumer<String> getConsumer(){
//定义消费者信息
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.131.147:9092");
properties.put("group.id", "flink-consumer-kafka-group");
properties.put("auto.offset.reset", "latest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), properties);
return consumer;
}
3、收集数据ReadKafkaFlinkWindowFunction的实现类


总结
分布式处理引擎Flink使用至少一个【job】调度和至少一个【task】实现分布式处理
我这里结合我们项目的场景来给各位说一下该选那种处理。我们的场景为:
1:尽量支持最多的数据落地
2:数据必须要准确。所以我们最终了有界处理,将flink的界限设置为0.5秒,0.5秒内收集的所有数据整体使用一个算
子消费。保证数据的准确和消费高效性。
1、一定要有抛出异常的机制
2、关于并行度parallelism
3、关于checkpoint
4、关于并行度
我们在设置并行度的时候,将里边的数字设置为多少,最终就会有多少个线程来执行任务。
所以大家一定要清楚对于数据准确性高的数据来说,宁愿牺牲多线程带来的效率提升也要只设置一个线程来执行消费。
可能大家没有注意,如果你不设置flink的并行度为1时。它是以的是系统的线程数来作为并行度!这样顺序是会乱的。
5、saveBatch很好
但是我建议你先封装一下或者改为批量的保存。可能大家都知道或者说都用过mybatis plus的saveBatch,它能将一个列表的inseert封装为一条sql(insert into a values(a1),(a2),(a3)),但是我们一条sql的长度过长的话会存在性能问题。建议在批量处理的时候每隔1000条记录saveBatch一次
为什么flink消费kafka比官方的listener都要快
1、并行度和分区处理: Flink 具有高度的并行度支持
2、事件时间处理
状态管理
异步处理模型: