以一个简单的入门例子,统计每个单词出现的次数开始。
1. pom配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>study-flink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入Flink相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Flink 底层的架构使用了Akka 来实现分布式通信,Akka是用Scala 开发的。因此指定了Scala 版本。
2. 编写程序进行测试
1. 项目下新建文件 file/words.txt 内容如下:
hello world
hello flink
hello java
java nb
java pl
2. 批处理程序:
思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
package cn.qz;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 批处理逻辑
*/
public class BatchWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataSource<String> txtDataSource = executionEnvironment.readTextFile("file/words.txt");
// 3. 转换数据格式
FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = txtDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] strs = line.split(" ");
for (String str : strs) {
out.collect(Tuple2.of(str, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG)); // lambda 使用泛型,由于泛型擦除,需要显示的声明类型信息
// 4. 按照word 进行分组(按照第一个字段分组。 也就是按照String 类型的词分组). 有下面两种方式
// 第一种,指定属性名称。 f0 是 org.apache.flink.api.java.tuple.Tuple2.f0
// UnsortedGrouping<Tuple2<String, Long>> tuple2UnsortedGrouping = wordAndOne.groupBy("f0");
UnsortedGrouping<Tuple2<String, Long>> tuple2UnsortedGrouping = wordAndOne.groupBy(0);
// 5. 分组聚合统计
AggregateOperator<Tuple2<String, Long>> sum = tuple2UnsortedGrouping.sum(1);
// 6. 打印结果
sum.print();
}
}
结果:
(nb,1)
(flink,1)
(world,1)
(hello,3)
(java,3)
(pl,1)
可以看到将文档中的所有单词的频次,全部统计打出来。
需要注意,这种批处理是基于DataSetAPI的,也就是数据集API。
3. 流处理-有界数据
DataStreamAPI用于处理流处理。
package cn.qz;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境(流处理执行环境)
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource<String> txtDataSource = executionEnvironment.readTextFile("file/words.txt");
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要显示的声明类型信息
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
// 5. 求和 (sum、min、max 可以用字段名称,也可以用字段顺序)
SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
// 6. 打印
sum.print();
// 7. 执行
executionEnvironment.execute();
}
}
与批处理程序不同的是:
1》创建的执行环境不同
2》每一步处理转换之后,得到的数据对象类型不同
3》分组操作调用的是keyBy 方法,可以传入一个匿名函数作为键选择器,指定当前分组的key 是什么。
4》代码末尾需要调用execute 方法开始执行任务。
结果:
5> (world,1)
3> (hello,1)
2> (java,1)
4> (nb,1)
2> (java,2)
2> (pl,1)
7> (flink,1)
3> (hello,2)
2> (java,3)
3> (hello,3)
从结果可以看出。批处理针对每个单词,只会输出一个最终的统计个数。而在流处理的打印结果中,"hello"这个单词每出现一次就会统计一次。这也是流处理的特点,数据逐个处理。
另外,Flink是一个分布式处理引擎。在上面开发环境中,execute 之后实际会用多线程来模拟一个Flink 集群。前面的数字5>、3> 等指示了本地执行的不同线程,对应着Flink 运行时不同的并行资源。可以理解为是线程的资源信息,默认为CPU核数。
4. 流处理-无界数据流
模拟监听socket并且处理接收的数据。
(1) linux 用nc 监听端口 (nc 是linux 自带的一个netcat 工具)
nc -l 7777
(2) 编写代码监听7777 socket 端口并处理数据
package cn.qz;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class SocketStreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境(流处理执行环境)
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource<String> txtDataSource = executionEnvironment.socketTextStream("localhost", 7777);
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要显示的声明类型信息
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
// 6. 打印
sum.print();
// 7. 执行
executionEnvironment.execute();
}
}
(3)测试
1》nc 连接窗口输入如下信息
hello china hello java
2》控制台输出
3> (hello,1)
2> (java,1)
3> (china,1)
3> (hello,2)
5. 本地环境求最大最小
package cn.qz;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import java.io.Serializable;
import java.util.List;
public class LocalEnvTest {
public static void main(String[] args) throws Exception {
// 1. 开启本地环境,默认开启一个线程
LocalEnvironment localEnvironment = ExecutionEnvironment.createLocalEnvironment();
// 2. 构造元素
final User user1 = new User().setName("user1").setAge(12);
User user2 = new User().setName("user2").setAge(139);
User user3 = new User().setName("user3").setAge(14);
DataSource<User> userDataSource = localEnvironment.fromElements(user1, user2, user3);
// 2. 返回二元数组对象
MapOperator<User, Tuple2<Integer, User>> map = userDataSource.map(new MapFunction<User, Tuple2<Integer, User>>() {
public Tuple2 map(User user) throws Exception {
return new Tuple2(user.getAge(), user);
}
});
// 3. 求最大最小
/**
* max与maxBy都能用来排序,他们都只能对Tuple类型数据源生效。
* max用来对指定那一列/多列进行排序,其它列不保证,因此返回结果中指定的一列/多列是最大值,其它列为数据源迭代中的最后一条记录。
* maxBy可以根据指定的一列/多列进行排序,最终返回的是最大的那列对应记录。maxBy本质上就是对Tuple类型数据某个位置元素进行比较排序,类似于索引,最终返回最大的那个元素
*/
List<Tuple2<Integer, User>> collect = map.max(0).collect();
collect.forEach(tmp -> {
System.out.println(tmp);
});
System.out.println("======");
List<Tuple2<Integer, User>> collect2 = map.maxBy(0).collect();
collect2.forEach(tmp -> {
System.out.println(tmp);
});
System.out.println("******");
List<Tuple2<Integer, User>> collect3= map.min(0).collect();
collect3.forEach(tmp -> {
System.out.println(tmp);
});
System.out.println("======");
List<Tuple2<Integer, User>> collect4 = map.minBy(0).collect();
collect4.forEach(tmp -> {
System.out.println(tmp);
});
}
@Data
@Accessors(chain = true)
public static class User implements Serializable {
private String name;
private Integer age;
}
}
结果:(可以看出,max/min 只是针对指定的字段求最大最小,后面的元素是数据源迭代最后一条记录; maxBy/minBy 是找到指定的最大最小,且后面的元素也是与之对应的元素。)
(139,LocalEnvTest.User(name=user3, age=14))
======
(139,LocalEnvTest.User(name=user2, age=139))
******
(12,LocalEnvTest.User(name=user3, age=14))
======
(12,LocalEnvTest.User(name=user1, age=12))
【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】