KafkaSource调优
动态发现分区
在使用 FlinkKafkaConsumer 时,可以开启 partition 的动态发现。通过 Properties指定参数开启(单位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
该参数表示间隔多久检测一次是否有新创建的 partition。默认值是 Long 的最小值,表示不开启,大于 0 表示开启。开启时会启动一个线程根据传入的 interval 定期获取 Kafka最新的元数据,新 partition 对应的那一个 subtask 会自动发现并从 earliest 位置开始消费,新创建的 partition 对其他 subtask 并不会产生影响。
代码如下:
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTE RVAL_MILLIS, 30 * 1000 + "");
从Kafka数据源生成 watermark
Kafka 单分区内有序,多分区间无序。在这种情况下,可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。
在单分区内有序的情况下,使用时间戳单调递增按分区生成的 watermark 将生成完美的全局 watermark。
可以不使用 TimestampAssigner,直接用 Kafka 记录自身的时间戳:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
"hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
"flinktest",
new SimpleStringSchema(),
properties
);
kafkaSourceFunction.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(2))
);
env.addSource(kafkaSourceFunction)
设置空闲等待
如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着WatermarkGenerator 也不会获得任何新数据去生成 watermark。称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。
举栗:
使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
"hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
"flinktest",
new SimpleStringSchema(),
properties
);
kafkaSourceFunction.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(2))
.withIdleness(Duration.ofMinutes(5))
);
env.addSource(kafkaSourceFunction)
Kafka 的 offset 消费策略
FlinkKafkaConsumer 可以调用以下 API,注意与”auto.offset.reset”区分开:
- setStartFromGroupOffsets():默认消费策略,默认读取上次保存的 offset 信息,如果应用第一次启动,读取不到上次offset信息 ,根据参数auto.offset.reset 的值来进行消费数据。建议使用这个。
- setStartFromEarliest():从最早的数据开始进行消费,忽略存储的 offset 信息
- setStartFromLatest():从最新的数据进行消费,忽略存储的 offset 信息
- setStartFromSpecificOffsets(Map):从指定位置进行消费
- setStartFromTimestamp(long):从 topic 中指定的时间点开始消费,指定时间点之前的数据忽略
- 当 checkpoint 机制开启的时候,KafkaConsumer 会定期把 kafka 的 offset 信息还有其他 operator 的状态信息一块保存起来。当 job 失败重启的时候,Flink 会从最近一次的 checkpoint 中进行恢复数据,重新从保存的 offset 消费 kafka 中的数据(上面几种策略,只有第一次启动的时候起作用)。
- 为了能够使用支持容错的 kafka Consumer,需要开启 checkpoint