文章目录
- Flink 系列文章
- 一、maven依赖
- 二、示例-Flink 1.13.6版本:kafka数据源,每10s统计一次地铁进站每个入口人数
- 1、maven依赖
- 2、实现
- 1)、java bean
- 2)、实现
- 3、验证
- 1)、验证步骤
- 2)、验证
- 三、示例-Flink 1.17.0版本:kafka数据源,每10s统计一次地铁进站每个入口人数
- 1、maven依赖
- 2、实现
- 1)、java bean
- 2)、实现
- 3、验证
- 1)、验证步骤
- 2)、验证
本文介绍了Flink WaterMark中kafka作为数据源的水印示例,其中包含详细的验证步骤与验证结果。
本文除了maven依赖外,没有其他依赖。
本文需要依赖kafka、nc的环境可用。
一、maven依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<dependency>
</dependencies>
二、示例-Flink 1.13.6版本:kafka数据源,每10s统计一次地铁进站每个入口人数
注意该示例中,发送数据时不需要eventtime,flink会以kafka发送数据的时间戳为eventtime,也不需要withTimestampAssigner指定eventtime,详见实现源码。
每10s统计一次地铁进站每个入口人数,最多接受延迟3s的数据
数据结构:进站口、人数
1、maven依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.13.6</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
2、实现
1)、java bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
private String sNo;
private Integer userCount;
}
2)、实现
import java.time.Duration;
import java.util.Properties;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
/**
* @author alanchan
*
*/
public class KafkaWatermarkDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// 准备kafka连接参数
Properties props = new Properties();
props.setProperty("bootstrap.servers", "server1:9092");
props.setProperty("group.id", "flink");
props.setProperty("auto.offset.reset", "latest");
props.setProperty("flink.partition-discovery.interval-millis", "5000");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "2000");
// 使用连接参数创建FlinkKafkaConsumer/kafkaSource
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("t_kafkasource", new SimpleStringSchema(), props);
// 使用kafkaSource
DataStream<Subway> subwayDS = env.addSource(kafkaSource).map(new MapFunction<String, Subway>() {
@Override
public Subway map(String value) throws Exception {
String[] arr = value.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
// transformation
// 设置watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允许的延迟时间
);
// 计算窗口
SingleOutputStreamOperator<Subway> result = subwayWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("userCount");
// sink
result.print();
// execute
env.execute();
}
}
3、验证
1)、验证步骤
1、启动kafka,创建topic
2、启动应用程序
3、在kafka命令行中输入符合格式要求的数据
4、观察应用程序的控制台输出
2)、验证
启动kafka、创建topic、启动应用程序不再赘述,如果不清楚的参考本人kafka专栏。
1、在kafka命令控制台输入数据
kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
[alanchan@server1 onekeystart]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,2
>1,3
>1,4
>1,5
>1,6
>1,7
>2,4
>2,6
>2,8
>3,6
2、观察应用程序中的控制台输出
7> Subway(sNo=1, userCount=20)
4> Subway(sNo=2, userCount=18)
7> Subway(sNo=1, userCount=7)
三、示例-Flink 1.17.0版本:kafka数据源,每10s统计一次地铁进站每个入口人数
注意该示例中,发送数据时不需要eventtime,flink会以kafka发送数据的时间戳为eventtime,也不需要withTimestampAssigner指定eventtime,详见实现源码。
每10s统计一次地铁进站每个入口人数,最多接受延迟3s的数据
数据结构:进站口、人数
1、maven依赖
<!-- flink连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
2、实现
1)、java bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
private String sNo;
private Integer userCount;
}
2)、实现
import java.time.Duration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.datastreamapi.watermarker.bean.Subway;
/**
* @author alanchan
*
*/
public class TestKafkaWatermarkDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 1、env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、 source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
.setTopics("alan_kafkasource")
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 3、 transformation
// 设置watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
// 使用kafkaSource
DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "Kafka Source");
DataStream<Subway> subwayDS = kafkaDS.map(new MapFunction<String, Subway>() {
@Override
public Subway map(String value) throws Exception {
String[] arr = value.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
// 计算窗口
SingleOutputStreamOperator<Subway> result = subwayDS.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("userCount");
// 4、 sink
result.print();
// 5、execute
env.execute();
}
}
3、验证
1)、验证步骤
1、启动kafka,创建topic
2、启动应用程序
3、在kafka命令行中输入符合格式要求的数据
4、观察应用程序的控制台输出
2)、验证
启动kafka、创建topic、启动应用程序不再赘述,如果不清楚的参考本人kafka专栏。
1、在kafka命令控制台输入数据
kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
[alanchan@server1 onekeystart]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,2
>1,3
>1,4
>1,5
>1,6
>1,7
>2,4
>2,6
>2,8
>3,6
2、观察应用程序中的控制台输出
7> Subway(sNo=1, userCount=20)
4> Subway(sNo=2, userCount=18)
7> Subway(sNo=1, userCount=7)
以上,本文介绍了Flink WaterMark中kafka作为数据源的水印示例,其中包含详细的验证步骤与验证结果。