1、AtSourceGenerateWatermark
 注意:从 Flink 1.17开始,FLIP-27 源框架支持拆分级别的水印对齐。
import java.time.Duration;
public class _02_AtSourceGenerateWatermark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("my-broker")
                .setTopics("my-topic")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> source = env.fromSource(kafkaSource
                , WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
                , "kafka_source"
                , TypeInformation.of(new TypeHint<String>() {
                }));
        source.print();
        env.execute();
    }
}
2、在 source 之后生成水位线
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
public class _03_AfterSourceGenerateWatermark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<_01_MyEvent> eventMap = source.map(new MapFunction<String, _01_MyEvent>() {
            @Override
            public _01_MyEvent map(String value) throws Exception {
                String[] fields = value.split(",");
                return new _01_MyEvent(Integer.parseInt(fields[0]),
                        fields[1],
                        Long.parseLong(fields[2]));
            }
        });
        SingleOutputStreamOperator<_01_MyEvent> timestampsAndWatermarks = eventMap.assignTimestampsAndWatermarks(WatermarkStrategy.<_01_MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<_01_MyEvent>() {
                    @Override
                    public long extractTimestamp(_01_MyEvent element, long recordTimestamp) {
                        return element.getEventTime();
                    }
                }));
        timestampsAndWatermarks.print();
        env.execute();
    }
}









