Flink简单使用教程
一、基础
1.1 环境配置
在pom.xml引入flink的相关依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.0</version>
</dependency>
如果已经引入了Kafka的依赖,为了避免flink和Kafka使用的scala版本不同导致的错误,需要在Kafka的依赖中排除掉对scala的依赖:
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
1.2 基础概念
Flink是一个流批一体的数据处理平台。
自然环境中,数据的产生是流式的。无论是来自 Web 服务器的事件数据、证券交易所的交易数据,还是来自车间机器上的传感数据,其数据都是流式的。
分析数据时,可以围绕 有界流(bounded 或 无界流(unbounded) 两种模型来处理数据。
批处理针对的是有界数据流,可以在计算结果输出之前输入整个数据集。因此,可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。
流处理针对的是无界数据流,理论上来说输入永远不会结束。因此,程序必须持续不断地对到达的数据进行处理。
1.3 什么能被转换成流?
Flink 的DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有:
- 基本类型,即 String、Long、Integer、Boolean、Array
- 复合类型:Tuples、POJOs 和 Scala case classes
Tuple就是一个元组,例如Tuple2就是一个二元组,如下是一个有两个属性的二元组实例:
1.4 DataStream
简单的来说就是需要处理的数据源,DataStream是flink程序中必不可少的一个类,该类用于表示数据集合,可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。
在用法上类似于常规的 Java集合,一旦它们被创建就不能添加或删除元素,也不能简单地察看内部元素,只能使用 DataStream API 操作来处理它们,这种通过DataStream API的操作也叫作转换(transformation)。
1.5 Flink程序基本构成
Flink 程序看起来像一个转换 DataStream
的常规程序。每个程序由相同的基本部分组成:
- 获取一个执行环境(StreamExecutionEnvironment);
- 加载/创建初始数据;
- 指定数据相关的转换;
- 指定计算结果的存储位置;
- 触发程序执行(Execute)。
StreamExecutionEnvironment 是所有 Flink 程序的基础。可以使用 StreamExecutionEnvironment 的静态方法获取 StreamExecutionEnvironment:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
通常,只需要使用 getExecutionEnvironment() ,该方法会根据上下文做正确的处理:如果你在 IDE 中执行你的程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。
如果你基于程序创建了一个 JAR 文件,并通过命令行运行它,Flink 集群管理器将执行程序的 main 方法,返回一个执行环境以在集群上执行你的程序。
1.6 Data Source
Source 是程序读取其输入的地方,通过 StreamExecutionEnvironment`可以访问多种预定义的 stream source:
基于文件:
readTextFile(path)
- 读取文本文件,逐行读取并将它们作为字符串返回。readFile(fileInputFormat, path)
- 按照指定的文件输入格式读取(一次)文件。
基于套接字:
socketTextStream
- 从套接字读取。
基于集合:
fromCollection(Collection)
- 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。fromCollection(Iterator, Class)
- 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。fromElements(T ...)
- 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。fromParallelCollection(SplittableIterator, Class)
- 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。generateSequence(from, to)
- 基于给定间隔内的数字序列并行生成数据流。
自定义:
addSource
- 关联一个新的 source function。例如,你可以使用addSource(new FlinkKafkaConsumer<>(...))
来从 Apache Kafka 获取数据。
1.7 Data Sick
Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。
Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:
writeAsText()
- 将元素按行写成字符串。writeAsCsv(...)
- 将元组写成逗号分隔值文件。print()
- 在标准输出流上打印每个元素的 toString() 值。writeUsingOutputFormat()
- 自定义文件输出的方法和基类。writeToSocket
- 根据SerializationSchema
将元素写入套接字。addSink
- 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。
二、流处理实例
2.1 通过年龄过滤出成年人和未成年人
public class Example {
public static void main(String[] args) throws Exception {
//1.创建flink的执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.创建数据源
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
//3.对数据进行过滤
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
//4.数据的处理方式(输出控制台)
adults.print();
//5.执行flink任务
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {}
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
}
}
}
2.2 统计单词个数
基于流窗口的单词统计应用程序,计算 5 秒窗口内来自 Web 套接字的单词数
public class WindowWordCount {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.通过socket创建数据源,并进行过滤
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())//调用Splitter类按规则过滤输入字符串,返回的是一个个二元组(单词,单词出现次数)
.keyBy(value -> value.f0) //按照单词进行排序
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//限定窗口时间
.sum(1);//1表示按照第2个位置的数字进行求和,即计算单词出现的次数
//3.输出处理结果
//需要注意的是,输出结果是源源不断的过程,首先env.execute执行,然后只要flink接收到数据,就会调用注释2.xx和3.xx的代 码,过滤数据,输出结果,中间这一部分代码会一直执行
dataStream.print();
//4.执行程序
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
//通过空格对输入进行分组,每一个单词的属性值(出现次数)均为1
//例如输入:a b c a b,输出为:a 2,b 2,c 1(a出现2次,b出现2次,c出现1次)
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
要运行示例程序,首先从终端使用 netcat (要下载相关软件)启动输入流:
nc -lk 9999
只需输入一些单词,然后按回车键即可传入新单词。
三、批处理实例
public class WordCount {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.通过文本文件(有界数据)创建数据源
DataSource<String> dataSource = env.readTextFile("C:\\...")
//3.处理输入
DataSet<Tuple2<String,Integer>> dataSet = dataSource
.flatMap(new Splitter())//调用Splitter类按规则过滤输入字符串,返回的是一个个二元组(单词,单词出现次数)
.groupBy(0) //0表示按照第1个位置的单词进行排序
.sum(1);//1表示按照第2个位置的数字进行求和,即计算单词出现的次数
//4.将处理结果写出到文件
dataSet.writeAsCsv("C:\\...");
//5.执行程序
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
//通过空格对输入进行分组,每一个单词的属性值(出现次数)均为1
//例如输入:a b c a b,输出为:a 2,b 2,c 1(a出现2次,b出现2次,c出现1次)
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
四、flink读写Kafka数据
测试时,首先运行消费者程序,再运行生产者
4.1 创建消费者类FlinkConsumer
public class WindowWordCount {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.配置文件
Properties props = new Properties();
props.put("bootstrap.servers","Kafka集群地址");
//3.构造消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
//4.配置消费者
DataStreamSource stream = env.addSource(consumer);
//5.data sick
stream.print();
//6.执行程序
env.execute("消费者程序");
}
}
4.2 创建生产者类FlinkProducer
public class WindowWordCount {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.配置文件
Properties props = new Properties();
props.put("bootstrap.servers","Kafka集群地址");
//3.构造生产者
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("topic", new SimpleStringSchema(), props);
//4.配置数据源和生产者
env.fromElement("hello", "flink", "kafka").addSink(producer);
//5.执行程序
env.execute("生产者程序");
}
}
4.3 概念
Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。
构造函数接受以下参数:
- Topic 名称或者名称列表
- 用于反序列化 Kafka 数据的 DeserializationSchema 或者 KafkaDeserializationSchema
- Kafka 消费者的属性。需要以下属性:
- “bootstrap.servers”(以逗号分隔的 Kafka broker 列表)
- “group.id” 消费组 ID
Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。
构造器接收下列参数:
- 事件被写入的默认输出 topic
- 序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
- Kafka client 的 Properties。下列 property 是必须的:
- “bootstrap.servers” (逗号分隔 Kafka broker 列表)
- 容错语义