尚硅谷大数据项目之Flink
(作者:尚硅谷大数据研发部)
版本:V1.12.0
Flink简介
初识Flink
Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014年4月Stratosphere的代码被复制并捐赠给了Apache软件基金会,参加这个孵化项目的初始成员是Stratosphere系统的核心开发人员,2014年12月,Flink一跃成为Apache软件基金会的顶级项目。
在德语中,Flink一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为logo,这不仅是因为松鼠具有快速和灵巧的特点,还因为柏林的松鼠有一种迷人的红棕色,而Flink的松鼠logo拥有可爱的尾巴,尾巴的颜色与Apache软件基金会的logo颜色相呼应,也就是说,这是一只Apache风格的松鼠。
Flink项目的理念是:“Apache Flink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
Flink的重要特点
事件驱动型(Event-driven)
事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以kafka为代表的消息队列几乎都是事件驱动型应用。(Flink的计算也是事件驱动型)
与之不同的就是SparkStreaming微批次,如图:
事件驱动型:
流与批的世界观
批处理的特点是有界、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。
流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。
在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
而在flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。
无界数据流:
无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。
有界数据流:
有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。
分层API
最底层级的抽象仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到DataStream API中。底层过程函数(Process Function) 与 DataStream API 相集成,使其可以对某些特定的操作进行底层的抽象,它允许用户可以自由地处理来自一个或多个数据流的事件,并使用一致的容错的状态。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
实际上,大多数应用并不需要上述的底层抽象,而是针对核心API(Core APIs) 进行编程,比如DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些API为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些API处理的数据类型以类(classes)的形式由各自的编程语言所表示。
Table API 是以表为中心的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。Table API程序声明式地定义了什么逻辑操作应该执行,而不是准确地确定这些操作代码的看上去如何。
尽管Table API可以通过多种类型的用户自定义函数(UDF)进行扩展,其仍不如核心API更具表达能力,但是使用起来却更加简洁(代码量更少)。除此之外,Table API程序在执行之前会经过内置优化器进行优化。
你可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。
目前Flink作为批处理还不是主流,不如Spark成熟,所以DataSet使用的并不是很多。Flink Table API和Flink SQL也并不完善,大多都由各大厂商自己定制。所以我们主要学习DataStream API的使用。实际上Flink作为最接近Google DataFlow模型的实现,是流批统一的观点,所以基本上使用DataStream就可以了。
2020年12月8日发布的最新版本1.12.0, 已经完成实现了真正的流批一体. 写好的一套代码, 即可以处理流式数据, 也可以处理离线数据. 这个与前面版本的处理有界流的方式是不一样的, Flink专门对批处理数据做了优化处理.
Spark or Flink
Spark 和 Flink 一开始都拥有着同一个梦想,他们都希望能够用同一个技术把流处理和批处理统一起来,但他们走了完全不一样的两条路前者是以批处理的技术为根本,并尝试在批处理之上支持流计算;后者则认为流计算技术是最基本的,在流计算的基础之上支持批处理。正因为这种架构上的不同,今后二者在能做的事情上会有一些细微的区别。比如在低延迟场景,Spark 基于微批处理的方式需要同步会有额外开销,因此无法在延迟上做到极致。在大数据处理的低延迟场景,Flink 已经有非常大的优势。
Spark和Flink的主要差别就在于计算模型不同。Spark采用了微批处理模型,而Flink采用了基于操作符的连续流模型。因此,对Apache Spark和Apache Flink的选择实际上变成了计算模型的选择,而这种选择需要在延迟、吞吐量和可靠性等多个方面进行权衡。
如果企业中非要技术选型从Spark和Flink这两个主流框架中选择一个来进行流数据处理,我们推荐使用Flink,主(显而)要(易见)的原因为:
Flink灵活的窗口
Exactly Once语义保证
事件时间(event-time)语义(处理乱序数据或者延迟数据)
这两个原因可以大大的解放程序员, 加快编程效率, 把本来需要程序员花大力气手动完成的工作交给框架完成,棒棒哒,点赞!!!
Flink的应用
应用Flink的场景
事件驱动型应用
事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。
相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。下图描述了传统应用和事件驱动型应用架构的区别。
事件驱动型应用无须查询远程数据库,本地数据访问使得它具有更高的吞吐和更低的延迟。而由于定期向远程持久化存储的 checkpoint 工作可以异步、增量式完成,因此对于正常事件处理的影响甚微。事件驱动型应用的优势不仅限于本地数据访问。传统分层架构下,通常多个应用会共享同一个数据库,因而任何对数据库自身的更改(例如:由应用更新或服务扩容导致数据布局发生改变)都需要谨慎协调。反观事件驱动型应用,由于只需考虑自身数据,因此在更改数据表示或服务扩容时所需的协调工作将大大减少。
数据分析应用
Apache Flink 同时支持流式及批量分析应用。
Flink 为持续流式分析和批量分析都提供了良好的支持。具体而言,它内置了一个符合 ANSI 标准的 SQL 接口,将批、流查询的语义统一起来。无论是在记录事件的静态数据集上还是实时事件流上,相同 SQL 查询都会得到一致的结果。同时 Flink 还支持丰富的用户自定义函数,允许在 SQL 中执行定制化代码。如果还需进一步定制逻辑,可以利用 Flink DataStream API 和 DataSet API 进行更低层次的控制。此外,Flink 的 Gelly 库为基于批量数据集的大规模高性能图分析提供了算法和构建模块支持。
数据管道应用
提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。
数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。因此它支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。例如:数据管道可以用来监控文件系统目录中的新文件,并将其数据写入事件日志;另一个应用可能会将事件流物化到数据库或增量构建和优化查询索引。
下图描述了周期性 ETL 作业和持续数据管道的差异。
应用Flink的行业
电商和市场营销
数据报表、广告投放
物联网(IOT)
传感器实时数据采集和显示、实时报警,交通运输业
物流配送和服务业
订单状态实时更新、通知信息推送、电信业基站流量调配
银行和金融业
实时结算和通知推送,实时检测异常行为
应用Flink的企业
Flink快速上手
创建maven项目
POM文件中添加需要的依赖:
|
src/main/resources添加文件:log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
配置idea, 运行的时候包括provided scope
批处理WordCount
// 1. 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
DataSource lineDS = env.readTextFile(“input/words.txt”);
// 3. 转换数据格式
FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] split = line.split(" ");
for (String word : split) {
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG)); // 当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
// 4. 按照 word 进行分组
UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
// 5. 分组内聚合统计
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
// 6. 打印结果
sum.print();
流处理WordCount
有界流
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource lineDSS = env.readTextFile(“input/words.txt”);
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
.flatMap((String line, Collector words) -> {
Arrays.stream(line.split(" “)).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
.keyBy(t -> t.f0);
// 5. 求和
Sing leOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
.sum(1);
// 6. 打印
result.print();
// 7. 执行
env.execute();
无界流
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource lineDSS = env.socketTextStream(“hadoop162”, 9999);
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
.flatMap((String line, Collector words) -> {
Arrays.stream(line.split(” ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
.sum(1);
// 6. 打印
result.print();
// 7. 执行
env.execute();
Flink部署
开发模式
咱们前面在idea中运行Flink程序的方式就是开发模式.
local-cluster模式
Flink中的Local-cluster(本地集群)模式,主要用于测试, 学习.
local-cluster模式配置
local-cluster模式基本属于零配置.
上传Flink的安装包flink-1.12.0-bin-scala_2.11.tgz到hadoop162
解压
tar -zxvf flink-1.12.0-bin-scala_2.11.tgz -C /opt/module
进入目录/opt/module, 复制flink-local
cd /opt/module
cp -r flink-1.12.0 flink-local
在local-cluster模式下运行无界的WordCount
打包idea中的应用
把不带依赖的jar包上传到目录/opt/module/flink-local下
启动本地集群
bin/start-cluster.sh
在hadoop162中启动netcat
nc -lk 9999
注意: 如果没有安装netcat需要先安装:
sudo yum install -y nc
命令行提交Flink应用
bin/flink run -m hadoop162:8081 -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
在浏览器中查看应用执行情况
http://hadoop162:8081
也可以在log日志查看执行结果
cat flink-atguigu-taskexecutor-0-hadoop162.out
也可以在WEB UI提交应用
Standalone模式
Standalone模式又叫独立集群模式.
Standalone模式配置
复制flink-standalone
cp -r flink-1.12.0 flink-standalone
修改配置文件:flink-conf.yaml
jobmanager.rpc.address: hadoop162
修改配置文件:workers
hadoop163
hadoop164
分发flink-standalone到其他节点
Standalone模式运行无界流WorkCount
启动standalone集群
bin/start-cluster.sh
命令行提交Flink应用
bin/flink run -m hadoop162:8081 -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
查看执行情况与本地集群一致.
也支持Web UI界面提交Flink应用
Standalone高可用(HA)
任何时候都有一个 主 JobManager 和多个备用 JobManagers,以便在主节点失败时有备用 JobManagers 来接管集群。这保证了没有单点故障,一旦备 JobManager 接管集群,作业就可以正常运行。主备 JobManager 实例之间没有明显的区别。每个 JobManager 都可以充当主备节点。
修改配置文件: link-conf.yaml
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop162:8020/flink/standalone/ha
high-availability.zookeeper.quorum: hadoop162:2181,hadoop163:2181,hadoop164:2181
high-availability.zookeeper.path.root: /flink-standalone
high-availability.cluster-id: /cluster_atguigu
修改配置文件: masters
hadoop162:8081
hadoop163:8081
分发修改的后配置文件到其他节点
在/etc/profile.d/my.sh中配置环境变量
export HADOOP_CLASSPATH=hadoop classpath
注意:
需要提前保证HAOOP_HOME环境变量配置成功
分发到其他节点
首先启动dfs集群和zookeeper集群
启动standalone HA集群
bin/start-cluster.sh
可以分别访问
http://hadoop162:8081
http://hadoop163:8081
在zkCli.sh中查看谁是leader
get /flink-standalone/cluster_atguigu/leader/rest_server_lock
杀死hadoop162上的Jobmanager, 再看leader
注意: 不管是不是leader从WEB UI上看不到区别, 并且都可以与之提交应用.
Yarn模式
独立部署(Standalone)模式由Flink自身提供计算资源,无需其他框架提供资源,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Flink主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成更靠谱,所以接下来我们来学习在强大的Yarn环境中Flink是如何使用的。(其实是因为在国内工作中,Yarn使用的非常多)
把Flink应用提交给Yarn的ResourceManager, Yarn的ResourceManager会申请容器从Yarn的NodeManager上面. Flink会创建JobManager和TaskManager在这些容器上.Flink会根据运行在JobManger上的job的需要的slot的数量动态的分配TaskManager资源
Yarn模式配置
复制flink-yarn
cp -r flink-1.11.2 flink-yarn
配置环境变量HADOOP_CLASSPATH, 如果前面已经配置可以忽略.
在/etc/profile.d/my.sh中配置
export HADOOP_CLASSPATH=hadoop classpath
Yarn运行无界流WordCount
启动hadoop集群(hdfs, yarn)
运行无界流
bin/flink run -t yarn-per-job -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
在yarn的ResourceManager界面查看执行情况
Flink on Yarn的3种部署模式
Flink提供了yarn上运行的3模式,分别为Application Mode, Session-Cluster和Per-Job-Cluster模式。
Session-Cluster
Session-Cluster模式需要先启动Flink集群,向Yarn申请资源, 资源申请到以后,永远保持不变。以后提交任务都向这里提交。这个Flink集群会常驻在yarn集群中,除非手工停止。
在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.
缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job.
所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job.
Per-Job-Cluster
一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
Application Mode
Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.
与Per-Job-Cluster的区别: 就是Application Mode下, 用户的main函数是在集群中执行的
官方建议:
出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!
Per-Job-Cluster模式执行无界流WordCount
bin/flink run -d -t yarn-per-job -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
Session-Cluster模式执行无界流WordCount
启动一个Flink-Session
bin/yarn-session.sh -d
在Session上运行Job
bin/flink run -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
会自动找到你的yarn-session启动的Flink集群.也可以手动指定你的yarn-session集群:
bin/flink run -t yarn-session -Dyarn.application.id=application_XXXX_YY ./flink-prepare-1.0-SNAPSHOT.ja
注意: application_XXXX_YY 指的是在yarn上启动的yarn应用
Application Mode模式执行无界流WordCount
bin/flink run-application -t yarn-application -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
Yarn模式高可用
Yarn模式的高可用和Standalone模式的高可用原理不一样.
Standalone模式中, 同时启动多个Jobmanager, 一个为leader其他为standby, 当leader挂了, 其他的才会有一个成为leader.
yarn的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, yarn会再次启动一个, 其实是利用的yarn的重试次数来实现的高可用.
在yarn-site.xml中配置
yarn.resourcemanager.am.max-attempts
4
The maximum number of application master execution attempts.
注意: 配置完不要忘记分发, 和重启yarn
在flink-conf.yaml中配置
yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop162:8020/flink/yarn/ha
high-availability.zookeeper.quorum: hadoop162:2181,hadoop163:2181,hadoop164:2181
high-availability.zookeeper.path.root: /flink-yarn
启动yarn-session
杀死Jobmanager, 查看他的复活情况
注意: yarn-site.xml中是它活的次数的上限, flink-conf.xml中的次数应该小于这个值.
Scala REPL
scala 交互环境.
local模式启动 REPL
/opt/module/flink-local » bin/start-scala-shell.sh local
yarn-session 模式启动
先启动一个yarn-session, 然后就可以把shell跑在yarn-session上了
bin/start-scala-shell.sh yarn
K8S & Mesos模式
Mesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核,在Twitter得到广泛使用,管理着Twitter超过30,0000台服务器上的应用部署,但是在国内,依然使用着传统的Hadoop大数据框架,所以国内使用mesos框架的并不多,这里我们就不做过多讲解了。
容器化部署时目前业界很流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本中支持了k8s部署模式。这里我们也不做过多的讲解.
Flink运行架构
运行架构
https://ci.apache.org/projects/flink/flink-docs-release-1.11/fig/processes.svg
Flink运行时包含2种进程:1个JobManager和至少1个TaskManager
客户端
严格上说, 客户端不是运行和程序执行的一部分, 而是用于准备和发送dataflow到JobManager. 然后客户端可以断开与JobManager的连接(detached mode), 也可以继续保持与JobManager的连接(attached mode)
客户端作为触发执行的java或者scala代码的一部分运行, 也可以在命令行运行:bin/flink run …
JobManager
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个的JobManager所控制执行。
JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。
JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。
而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
这个进程包含3个不同的组件
ResourceManager
负责资源的管理,在整个 Flink 集群中只有一个 ResourceManager. 注意这个ResourceManager不是Yarn中的ResourceManager, 是Flink中内置的, 只是赶巧重名了而已.
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。
当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。
Dispatcher
负责接收用户提供的作业,并且负责为这个新提交的作业启动一个新的JobManager 组件. Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
JobMaster
JobMaster负责管理单个JobGraph的执行.多个Job可以同时运行在一个Flink集群中, 每个Job都有一个自己的JobMaster.
TaskManager
Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
核心概念
TaskManager与Slots
Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个Task。为了控制一个worker能接收多少个task,worker通过Task Slot来进行控制(一个worker至少有一个Task Slot)。
这里的Slot如何来理解呢?很多的文章中经常会和Spark框架进行类比,将Slot类比为Core,其实简单这么类比是可以的,可实际上,可以考虑下,当Spark申请资源后,这个Core执行任务时有可能是空闲的,但是这个时候Spark并不能将这个空闲下来的Core共享给其他Job使用,所以这里的Core是Job内部共享使用的。接下来我们再回想一下,之前在Yarn Session-Cluster模式时,其实是可以并行执行多个Job的,那如果申请两个Slot,而执行Job时,只用到了一个,剩下的一个怎么办?那我们自认而然就会想到可以将这个Slot给并行的其他Job,对吗?所以Flink中的Slot和Spark中的Core还是有很大区别的。
每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个task将不需要跟来自其他job的task竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。
Parallelism(并行度)
一个特定算子的子任务(subtask)的个数被称之为这个算子的并行度(parallelism),一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。
One-to-one:
stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。类似于spark中的窄依赖
Redistributing:
stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。类似于spark中的宽依赖
Task与SubTask
一个算子就是一个Task. 一个算子的并行度是几, 这个Task就有几个SubTask
Operator Chains(任务链)
相同并行度的one to one操作,Flink将这样相连的算子链接在一起形成一个task,原来的算子成为里面的一部分。 每个task被一个线程执行.
将算子链接成task是非常有效的优化:它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。链接的行为可以在编程API中进行指定。
ExecutionGraph(执行图)
由Flink程序直接映射成的数据流图是StreamGraph,也被称为逻辑流图,因为它们表示的是计算逻辑的高级视图。为了执行一个流处理程序,Flink需要将逻辑流图转换为物理数据流图(也叫执行图),详细说明程序的执行方式。
Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> Physical Graph。
StreamGraph:
是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
JobGraph:
StreamGraph经过优化后生成了 JobGraph,是提交给 JobManager 的数据结构。主要的优化为: 将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
ExecutionGraph:
JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
Physical Graph:
JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
2个并发度(Source为1个并发度)的 SocketTextStreamWordCount 四层执行图的演变过程
env.socketTextStream().flatMap(…).keyBy(0).sum(1).print();
提交流程
高级视角提交流程(通用提交流程)
我们来看看当一个应用提交执行时,Flink的各个组件是如何交互协作的:
yarn-cluster提交流程per-job
Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
向Yarn ResourceManager提交任务,ResourceManager分配Container资源
通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager
ApplicationMaster向ResourceManager申请资源启动TaskManager
ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。
Flink流处理核心编程
和其他所有的计算框架一样,Llink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分
Environment
Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单
// 批处理环境
ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();
// 流式数据处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Source
Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源(Source)。
准备工作
导入注解工具依赖, 方便生产POJO类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
- @Author lzc
- @Date 2020/12/8 22:22
- 水位传感器:用于接收水位数据
- id:传感器编号
- ts:时间戳
- vc:水位
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
private String id;
private Long ts;
private Integer vc;
}
从Java的集合中读取数据
一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。
package com.atguigu.flink.java.chapter_5;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/8 22:44
*/
public class Flink01_Source_Collection {
public static void main(String[] args) throws Exception {
List waterSensors = Arrays.asList(
new WaterSensor(“ws_001”, 1577844001L, 45),
new WaterSensor(“ws_002”, 1577844015L, 43),
new WaterSensor(“ws_003”, 1577844020L, 42));// 1. 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .fromCollection(waterSensors) .print(); env.execute();
}
}
从文件读取数据
package com.atguigu.flink.java.chapter_5;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/8 22:44
*/
public class Flink02_Source_File {
public static void main(String[] args) throws Exception {// 1. 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .readTextFile("input") .print(); env.execute();
}
}
说明:
参数可以是目录也可以是文件
路径可以是相对路径也可以是绝对路径
相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录
也可以从hdfs目录下读取, 使用路径:hdfs://…, 由于Flink没有提供hadoop相关依赖, 需要pom中添加相关依赖:
org.apache.hadoop
hadoop-client
3.1.3
provided
从Socket读取数据
参考第1章无界流读取
从Kafka读取数据
添加相应的依赖
org.apache.flink
flink-connector-kafka_2.11
1.11.2
参考代码
package com.atguigu.flink.java.chapter_5;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/8 22:44
*/
public class Flink03_Source_Kafka {
public static void main(String[] args) throws Exception {// 0.Kafka相关配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092"); properties.setProperty("group.id", "Flink01_Source_Kafka"); properties.setProperty("auto.offset.reset", "latest"); // 1. 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .addSource(new FlinkKafkaConsumer<>("sensor", new SimpleStringSchema(), properties)) .print("kafka source"); env.execute();
}
}
自定义Source
大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.
package com.atguigu.flink.java.chapter_5;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/8 22:44
*/
public class Flink04_Source_Custom {
public static void main(String[] args) throws Exception {// 1. 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .addSource(new MySource("hadoop102", 9999)) .print(); env.execute();
}
public static class MySource implements SourceFunction {
private String host;
private int port;
private volatile boolean isRunning = true;
private Socket socket;public MySource(String host, int port) { this.host = host; this.port = port; } @Override public void run(SourceContext<WaterSensor> ctx) throws Exception { // 实现一个从socket读取数据的source socket = new Socket(host, port); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); String line = null; while (isRunning && (line = reader.readLine()) != null) { String[] split = line.split(","); ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]))); } } /** * 大多数的source在run方法内部都会有一个while循环, * 当调用这个方法的时候, 应该可以让run方法中的while循环结束 */ @Override public void cancel() { isRunning = false; try { socket.close(); } catch (IOException e) { e.printStackTrace(); } }
}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50
*/
Transform
转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
map
作用
将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素
参数
lambda表达式或MapFunction实现类
返回
DataStream → DataStream
示例
得到一个新的数据流: 新的流的元素是原来流的元素的平方
匿名内部类对象
package com.atguigu.flink.java.chapter_5.transform;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 7:17
*/
public class Flink01_TransForm_Map_Anonymous {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env .fromElements(1, 2, 3, 4, 5) .map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value * value; } }) .print(); env.execute();
}
}
Lambda表达式表达式
env
.fromElements(1, 2, 3, 4, 5)
.map(ele -> ele * ele)
.print();
静态内部类
package com.atguigu.flink.java.chapter_5.transform;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 7:17
*/
public class Flink01_TransForm_Map_StaticClass {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env .fromElements(1, 2, 3, 4, 5) .map(new MyMapFunction()) .print(); env.execute();
}
public static class MyMapFunction implements MapFunction<Integer, Integer> {
@Override public Integer map(Integer value) throws Exception { return value * value; }
}
}
Rich…Function类
所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction
package com.atguigu.flink.java.chapter_5.transform;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 7:17
*/
public class Flink01_TransForm_Map_RichMapFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(5);env .fromElements(1, 2, 3, 4, 5) .map(new MyRichMapFunction()).setParallelism(2) .print(); env.execute();
}
public static class MyRichMapFunction extends RichMapFunction<Integer, Integer> {
// 默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次
@Override
public void open(Configuration parameters) throws Exception {
System.out.println(“open … 执行一次”);
}// 默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次 @Override public void close() throws Exception { System.out.println("close ... 执行一次"); } @Override public Integer map(Integer value) throws Exception { System.out.println("map ... 一个元素执行一次"); return value * value; }
}
}默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次, 而且先被调用
默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次, 而且是最后被调用
getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态. 开发人员在需要的时候自行调用获取运行时上下文对象.
flatMap
作用
消费一个元素并产生零个或多个元素
参数
FlatMapFunction实现类
返回
DataStream → DataStream
示例
匿名内部类写法
// 新的流存储每个元素的平方和3次方
env
.fromElements(1, 2, 3, 4, 5)
.flatMap(new FlatMapFunction<Integer, Integer>() {
@Override
public void flatMap(Integer value, Collector out) throws Exception {
out.collect(value * value);
out.collect(value * value * value);
}
})
.print();
Lambda表达式写法
env
.fromElements(1, 2, 3, 4, 5)
.flatMap((Integer value, Collector out) -> {
out.collect(value * value);
out.collect(value * value * value);
}).returns(Types.INT)
.print();
说明: 在使用Lambda表达式表达式的时候, 由于泛型擦除的存在, 在运行的时候无法获取泛型的具体类型, 全部当做Object来处理, 及其低效, 所以Flink要求当参数中有泛型的时候, 必须明确指定泛型的类型.
filter
作用
根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃
参数
FlatMapFunction实现类
返回
DataStream → DataStream
示例
匿名内部类写法
// 保留偶数, 舍弃奇数
env
.fromElements(10, 3, 5, 9, 20, 8)
.filter(new FilterFunction() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 == 0;
}
})
.print();
Lambda表达式写法
env
.fromElements(10, 3, 5, 9, 20, 8)
.filter(value -> value % 2 == 0)
.print();
keyBy
作用
把流中的数据分到不同的分区中.具有相同key的元素会分到同一个分区中.一个分区中可以有多重不同的key.
在内部是使用的hash分区来实现的.
参数
Key选择器函数: interface KeySelector<IN, KEY>
注意: 什么值不可以作为KeySelector的Key:
没有覆写hashCode方法的POJO, 而是依赖Object的hashCode. 因为这样分组没有任何的意义: 每个元素都会得到一个独立无二的组. 实际情况是:可以运行, 但是分的组没有意义.
任何类型的数组
返回
DataStream → KeyedStream
示例
匿名内部类写法
// 奇数分一组, 偶数分一组
env
.fromElements(10, 3, 5, 9, 20, 8)
.keyBy(new KeySelector<Integer, String>() {
@Override
public String getKey(Integer value) throws Exception {
return value % 2 == 0 ? “偶数” : “奇数”;
}
})
.print();
env.execute();
Lambda表达式写法
env
.fromElements(10, 3, 5, 9, 20, 8)
.keyBy(value -> value % 2 == 0 ? “偶数” : “奇数”)
.print();
shuffle
作用
把流中的元素随机打乱. 对同一个组数据, 每次执行得到的结果都不同.
参数
无
返回
DataStream → DataStream
示例
env
.fromElements(10, 3, 5, 9, 20, 8)
.shuffle()
.print();
env.execute();
split和select
已经过时, 在1.12中已经被移除
作用
在某些情况下,我们需要将数据流根据某些特征拆分成两个或者多个数据流,给不同数据流增加标记以便于从流中取出.
split用于给流中的每个元素添加标记. select用于根据标记取出对应的元素, 组成新的流.
参数
split参数: interface OutputSelector
select参数: 字符串
返回
split: SingleOutputStreamOperator -> SplitStream
slect: SplitStream -> DataStream
示例
匿名内部类写法
// 奇数一个流, 偶数一个流
SplitStream splitStream = env
.fromElements(10, 3, 5, 9, 20, 8)
.split(new OutputSelector() {
@Override
public Iterable select(Integer value) {
return value % 2 == 0
? Collections.singletonList(“偶数”)
: Collections.singletonList(“奇数”);
}
});
splitStream
.select(“偶数”)
.print(“偶数”);
splitStream
.select(“奇数”)
.print(“奇数”);
env.execute();
Lambda表达式写法
// 奇数一个流, 偶数一个流
SplitStream splitStream = env
.fromElements(10, 3, 5, 9, 20, 8)
.split(value -> value % 2 == 0
? Collections.singletonList(“偶数”)
: Collections.singletonList(“奇数”));
splitStream
.select(“偶数”)
.print(“偶数”);
splitStream
.select(“奇数”)
.print(“奇数”);
env.execute();
connect
作用
在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
参数
另外一个流
返回
DataStream[A], DataStream[B] -> ConnectedStreams[A,B]
示例
DataStreamSource intStream = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource stringStream = env.fromElements(“a”, “b”, “c”);
// 把两个流连接在一起: 貌合神离
ConnectedStreams<Integer, String> cs = intStream.connect(stringStream);
cs.getFirstInput().print(“first”);
cs.getSecondInput().print(“second”);
env.execute();
注意:
两个流中存储的数据类型可以不同
只是机械的合并在一起, 内部仍然是分离的2个流
只能2个流进行connect, 不能有第3个参与
union
作用
对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream
示例
DataStreamSource stream1 = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource stream2 = env.fromElements(10, 20, 30, 40, 50);
DataStreamSource stream3 = env.fromElements(100, 200, 300, 400, 500);
// 把多个流union在一起成为一个流, 这些流中存储的数据类型必须一样: 水乳交融
stream1
.union(stream2)
.union(stream3)
.print();
connect与 union 区别:
union之前两个或多个流的类型必须是一样,connect可以不一样
connect只能操作两个流,union可以操作多个。
简单滚动聚合算子
常见的滚动聚合算子
sum, min,max
minBy,maxBy
作用
KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream
参数
如果流中存储的是POJO或者scala的样例类, 参数使用字段名. 如果流中存储的是元组, 参数就是位置(基于0…)
返回
KeyedStream -> SingleOutputStreamOperator
示例
示例1
DataStreamSource stream = env.fromElements(1, 2, 3, 4, 5);
KeyedStream<Integer, String> kbStream = stream.keyBy(ele -> ele % 2 == 0 ? “奇数” : “偶数”);
kbStream.sum(0).print(“sum”);
kbStream.max(0).print(“max”);
kbStream.min(0).print(“min”);
示例2
ArrayList waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor(“sensor_1”, 1607527992000L, 20));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527994000L, 50));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527996000L, 30));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527993000L, 10));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527995000L, 30));
KeyedStream<WaterSensor, String> kbStream = env
.fromCollection(waterSensors)
.keyBy(WaterSensor::getId);
kbStream
.sum(“vc”)
.print(“maxBy…”);
注意:
分组聚合后, 理论上只能取分组字段和聚合结果, 但是Flink允许其他的字段也可以取出来, 其他字段默认情况是取的是这个组内第一个元素的字段值
示例3:
ArrayList waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor(“sensor_1”, 1607527992000L, 20));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527994000L, 50));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527996000L, 50));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527993000L, 10));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527995000L, 30));
KeyedStream<WaterSensor, String> kbStream = env
.fromCollection(waterSensors)
.keyBy(WaterSensor::getId);
kbStream
.maxBy(“vc”, false)
.print(“max…”);
env.execute();
注意:
maxBy和minBy可以指定当出现相同值的时候,其他字段是否取第一个. true表示取第一个, false表示取与最大值(最小值)同一行的.
reduce
作用
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值!
参数
interface ReduceFunction
返回
KeyedStream -> SingleOutputStreamOperator
示例
匿名内部类写法
ArrayList waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor(“sensor_1”, 1607527992000L, 20));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527994000L, 50));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527996000L, 50));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527993000L, 10));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527995000L, 30));
KeyedStream<WaterSensor, String> kbStream = env
.fromCollection(waterSensors)
.keyBy(WaterSensor::getId);
kbStream
.reduce(new ReduceFunction() {
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println(“reducer function …”);
return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());
}
})
.print(“reduce…”);
env.execute();
Lambda表达式写法
kbStream
.reduce((value1, value2) -> {
System.out.println(“reducer function …”);
return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());
})
.print(“reduce…”);
注意:
聚合后结果的类型, 必须和原来流中元素的类型保持一致!
process
作用
process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身)
示例1: 在keyBy之前的流上使用
env
.fromCollection(waterSensors)
.process(new ProcessFunction<WaterSensor, Tuple2<String, Integer>>() {
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>(value.getId(), value.getVc()));
}
})
.print();
示例2: 在keyBy之后的流上使用
env
.fromCollection(waterSensors)
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, Tuple2<String, Integer>>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>(“key是:” + ctx.getCurrentKey(), value.getVc()));
}
})
.print();
对流重新分区的几个算子
KeyBy
先按照key分组, 按照key的双重hash来选择后面的分区
shuffle
对流中的元素随机分区
reblance
对流中的元素平均分布到每个区.当处理倾斜数据的时候, 进行性能优化
rescale
同 rebalance一样, 也是平均循环的分布数据. 但是要比rebalance更高效, 因为rescale不需要通过网络, 完全走的"管道"
Sink
Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作.
之前我们一直在使用的print方法其实就是一种Sink
public DataStreamSink print(String sinkIdentifier) {
PrintSinkFunction printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
return addSink(printFunction).name(“Print to Std. Out”);
}
Flink内置了一些Sink, 除此之外的Sink需要用户自定义!
KafkaSink
添加Kafka Connector依赖
org.apache.flink
flink-connector-kafka_2.11
1.11.2
com.alibaba
fastjson
1.2.75
启动Kafka集群
Sink到Kafka的示例代码
package com.atguigu.flink.java.chapter_5.sink;
import com.alibaba.fastjson.JSON;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.ArrayList;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 14:46
*/
public class Flink01_Sink_Kafka {
public static void main(String[] args) throws Exception {
ArrayList waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor(“sensor_1”, 1607527992000L, 20));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527994000L, 50));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527996000L, 50));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527993000L, 10));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527995000L, 30));StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); env .fromCollection(waterSensors) .map(JSON::toJSONString) .addSink(new FlinkKafkaProducer<String>("hadoop102:9092", "topic_sensor", new SimpleStringSchema())); env.execute();
}
}
在linux启动一个消费者, 查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_sensorRedisSink
添加Redis Connector依赖
import com.alibaba.fastjson.JSON;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import java.util.ArrayList;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 14:46
*/
public class Flink02_Sink_Redis {
public static void main(String[] args) throws Exception {
ArrayList waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor(“sensor_1”, 1607527992000L, 20));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527994000L, 50));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527996000L, 50));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527993000L, 10));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527995000L, 30));// 连接到Redis的配置 FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder() .setHost("hadoop102") .setPort(6379) .setMaxTotal(100) .setTimeout(1000 * 10) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); env .fromCollection(waterSensors) .addSink(new RedisSink<>(redisConfig, new RedisMapper<WaterSensor>() { /* key value(hash) "sensor" field value sensor_1 {"id":"sensor_1","ts":1607527992000,"vc":20} ... ... */ @Override public RedisCommandDescription getCommandDescription() { // 返回存在Redis中的数据类型 存储的是Hash, 第二个参数是外面的key return new RedisCommandDescription(RedisCommand.HSET, "sensor"); } @Override public String getKeyFromData(WaterSensor data) { // 从数据中获取Key: Hash的Key return data.getId(); } @Override public String getValueFromData(WaterSensor data) { // 从数据中获取Value: Hash的value return JSON.toJSONString(data); } })); env.execute();
}
}
Redis查看是否收到数据
redis-cli --raw
注意:
发送了5条数据, redis中只有2条数据. 原因是hash的field的重复了, 后面的会把前面的覆盖掉
ElasticsearchSink
添加Elasticsearch Connector依赖
import com.alibaba.fastjson.JSON;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 14:46
*/
public class Flink03_Sink_ES {
public static void main(String[] args) throws Exception {
ArrayList waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor(“sensor_1”, 1607527992000L, 20));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527994000L, 50));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527996000L, 50));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527993000L, 10));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527995000L, 30));List<HttpHost> esHosts = Arrays.asList( new HttpHost("hadoop102", 9200), new HttpHost("hadoop103", 9200), new HttpHost("hadoop104", 9200)); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); env .fromCollection(waterSensors) .addSink(new ElasticsearchSink.Builder<WaterSensor>(esHosts, new ElasticsearchSinkFunction<WaterSensor>() { @Override public void process(WaterSensor element, RuntimeContext ctx, RequestIndexer indexer) { // 1. 创建es写入请求 IndexRequest request = Requests .indexRequest("sensor") .type("_doc") .id(element.getId()) .source(JSON.toJSONString(element), XContentType.JSON); // 2. 写入到es indexer.add(request); } }).build()); env.execute();
}
}
Elasticsearch查看是否收到数据
注意
如果出现如下错误:
添加log4j2的依赖:
org.apache.logging.log4j
log4j-to-slf4j
2.14.0
如果是无界流, 需要配置bulk的缓存
esSinkBuilder.setBulkFlushMaxActions(1);
自定义Sink
如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,怎么办?
我们自定义一个到Mysql的Sink
在mysql中创建数据库和表
create database test;
use test;
CREATE TABLE sensor
(
id
varchar(20) NOT NULL,
ts
bigint(20) NOT NULL,
vc
int(11) NOT NULL,
PRIMARY KEY (id
,ts
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
导入Mysql驱动
mysql
mysql-connector-java
5.1.49
写到Mysql的自定义Sink示例代码
package com.atguigu.flink.java.chapter_5.sink;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 14:46
*/
public class Flink04_Sink_Custom {
public static void main(String[] args) throws Exception {
ArrayList waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor(“sensor_1”, 1607527992000L, 20));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527994000L, 50));
waterSensors.add(new WaterSensor(“sensor_1”, 1607527996000L, 50));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527993000L, 10));
waterSensors.add(new WaterSensor(“sensor_2”, 1607527995000L, 30));StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); env.fromCollection(waterSensors) .addSink(new RichSinkFunction<WaterSensor>() { private PreparedStatement ps; private Connection conn; @Override public void open(Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false", "root", "aaaaaa"); ps = conn.prepareStatement("insert into sensor values(?, ?, ?)"); } @Override public void close() throws Exception { ps.close(); conn.close(); } @Override public void invoke(WaterSensor value, Context context) throws Exception { ps.setString(1, value.getId()); ps.setLong(2, value.getTs()); ps.setInt(3, value.getVc()); ps.execute(); } }); env.execute();
}
}执行模式(Execution Mode)
Flink在1.12.0上对流式API新增一项特性:可以根据你的使用情况和Job的特点, 可以选择不同的运行时执行模式(runtime execution modes).
流式API的传统执行模式我们称之为STREAMING 执行模式, 这种模式一般用于无界流, 需要持续的在线处理
1.12.0新增了一个BATCH执行模式, 这种执行模式在执行方式上类似于MapReduce框架. 这种执行模式一般用于有界数据.
默认是使用的STREAMING 执行模式
选择执行模式
BATCH执行模式仅仅用于有界数据, 而STREAMING 执行模式可以用在有界数据和无界数据.
一个公用的规则就是: 当你处理的数据是有界的就应该使用BATCH执行模式, 因为它更加高效. 当你的数据是无界的, 则必须使用STREAMING 执行模式, 因为只有这种模式才能处理持续的数据流.
配置BATH执行模式
执行模式有3个选择可配:
STREAMING(默认)
BATCH
AUTOMATIC
通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH …
通过代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
建议: 不要在运行时配置(代码中配置), 而是使用命令行配置, 引用这样会灵活: 同一个应用即可以用于无界数据也可以用于有界数据
有界数据用STREAMING和BATCH的区别
STREAMING模式下, 数据是来一条输出一次结果.
BATCH模式下, 数据处理完之后, 一次性输出结果.
下面展示WordCount的程序读取文件内容在不同执行模式下的执行结果对比:
流式模式
// 默认流式模式, 可以不用配置
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);批处理模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);自动模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
注意:
批模式下, 如果单词只有1个, 则不会输出, 原因是因为没有保存状态. 关于状态的概念后面再讲.
Flink流处理核心编程实战
基于埋点日志数据的网络流量统计
网站总浏览量(PV)的统计
衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)。用户每次打开一个页面便记录1次PV,多次打开同一页面则浏览量累计。
一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV。接下来我们就用咱们之前学习的Flink算子来实现PV的统计
准备数据
把数据文件 UserBehavior 复制到project的input目录下
用于封装数据的JavaBean类
package com.atguigu.flink.java.chapter_6;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
- @Author lizhenchao@atguigu.cn
- @Date 2020/12/10 19:32
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserBehavior {
private Long userId;
private Long itemId;
private Integer categoryId;
private String behavior;
private Long timestamp;
}
pv实现思路1: WordCount
package com.atguigu.flink.java.chapter_6;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 19:30
*/
public class Flink01_Project_PV {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .readTextFile("input/UserBehavior.csv") .map(line -> { // 对数据切割, 然后封装到POJO中 String[] split = line.split(","); return new UserBehavior( Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4])); }) .filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为 .map(behavior -> Tuple2.of("pv", 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG)) // 使用Tuple类型, 方便后面求和 .keyBy(value -> value.f0) // keyBy: 按照key分组 .sum(1) // 求和 .print(); env.execute();
}
}
pv实现思路2: process
package com.atguigu.flink.java.chapter_6;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 19:30
*/
public class Flink02_Project_PV {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .readTextFile("input/UserBehavior.csv") .map(line -> { // 对数据切割, 然后封装到POJO中 String[] split = line.split(","); return new UserBehavior( Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4])); }) .filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为 .keyBy(UserBehavior::getBehavior) .process(new KeyedProcessFunction<String, UserBehavior, Long>() { long count = 0; @Override public void processElement(UserBehavior value, Context ctx, Collector<Long> out) throws Exception { count++; out.collect(count); } }) .print(); env.execute();
}
}
网站独立访客数(UV)的统计
上一个案例中,我们统计的是所有用户对页面的所有浏览行为,也就是说,同一用户的浏览行为会被重复统计。而在实际应用中,我们往往还会关注,到底有多少不同的用户访问了网站,所以另外一个统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)
准备数据
对于UserBehavior数据源来说,我们直接可以根据userId来区分不同的用户.
UV实现思路
package com.atguigu.flink.java.chapter_6;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.HashSet;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 19:30
*/
public class Flink02_Project_UV {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .readTextFile("input/UserBehavior.csv") .flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] split = line.split(","); UserBehavior behavior = new UserBehavior( Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4])); if ("pv".equals(behavior.getBehavior())) { out.collect(Tuple2.of("uv", behavior.getUserId())); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy(t -> t.f0) .process(new KeyedProcessFunction<String, Tuple2<String, Long>, Integer>() { HashSet<Long> userIds = new HashSet<>(); @Override public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Integer> out) throws Exception { userIds.add(value.f1); out.collect(userIds.size()); } }) .print("uv"); env.execute();
}
}
市场营销商业指标统计分析
随着智能手机的普及,在如今的电商网站中已经有越来越多的用户来自移动端,相比起传统浏览器的登录方式,手机APP成为了更多用户访问电商网站的首选。对于电商企业来说,一般会通过各种不同的渠道对自己的APP进行市场推广,而这些渠道的统计数据(比如,不同网站上广告链接的点击量、APP下载量)就成了市场营销的重要商业指标。
APP市场推广统计 - 分渠道
封装数据的JavaBean类
package com.atguigu.flink.java.chapter_6;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
- @Author lizhenchao@atguigu.cn
- @Date 2020/12/10 21:41
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MarketingUserBehavior {
private Long userId;
private String behavior;
private String channel;
private Long timestamp;
}
具体实现代码
package com.atguigu.flink.java.chapter_6;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 21:44
*/
public class Flink04_Project_AppAnalysis_By_Chanel {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env .addSource(new AppMarketingDataSource()) .map(behavior -> Tuple2.of(behavior.getChannel() + "_" + behavior.getBehavior(), 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy(t -> t.f0) .sum(1) .print(); env.execute();
}
public static class AppMarketingDataSource extends RichSourceFunction {
boolean canRun = true;
Random random = new Random();
List channels = Arrays.asList(“huawwei”, “xiaomi”, “apple”, “baidu”, “qq”, “oppo”, “vivo”);
List behaviors = Arrays.asList(“download”, “install”, “update”, “uninstall”);@Override public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception { while (canRun) { MarketingUserBehavior marketingUserBehavior = new MarketingUserBehavior( (long) random.nextInt(1000000), behaviors.get(random.nextInt(behaviors.size())), channels.get(random.nextInt(channels.size())), System.currentTimeMillis()); ctx.collect(marketingUserBehavior); Thread.sleep(2000); } } @Override public void cancel() { canRun = false; }
}
}APP市场推广统计 - 不分渠道
env
.addSource(new AppMarketingDataSource())
.map(behavior -> Tuple2.of(behavior.getBehavior(), 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(t -> t.f0)
.sum(1)
.print();
各省份页面广告点击量实时统计
电商网站的市场营销商业指标中,除了自身的APP推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,也是市场营销的重要指标。
对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。
数据准备
在咱们当前的案例中,给大家准备了某电商网站的广告点击日志数据AdClickLog.csv, 本日志数据文件中包含了某电商网站一天用户点击广告行为的事件流,数据集的每一行表示一条用户广告点击行为,由用户ID、广告ID、省份、城市和时间戳组成并以逗号分隔。
将文件放置项目目录: input下
定义用来封装数据的JavaBean
package com.atguigu.flink.java.chapter_6;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
- @Author lizhenchao@atguigu.cn
- @Date 2020/12/10 22:30
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AdsClickLog {
private Long userId;
private Long adId;
private String province;
private String city;
private Long timestamp;
}
具体实现代码
package com.atguigu.flink.java.chapter_6;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import static org.apache.flink.api.common.typeinfo.Types.*;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 22:29
*/
public class Flink06_Project_Ads_Click {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env .readTextFile("input/AdClickLog.csv") .map(line -> { String[] datas = line.split(","); return new AdsClickLog( Long.valueOf(datas[0]), Long.valueOf(datas[1]), datas[2], datas[3], Long.valueOf(datas[4])); }) .map(log -> Tuple2.of(Tuple2.of(log.getProvince(), log.getAdId()), 1L)) .returns(TUPLE(TUPLE(STRING, LONG), LONG)) .keyBy(new KeySelector<Tuple2<Tuple2<String, Long>, Long>, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> getKey(Tuple2<Tuple2<String, Long>, Long> value) throws Exception { return value.f0; } }) .sum(1) .print("省份-广告"); env.execute();
}
}
订单支付实时监控
在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。
需求: 来自两条流的订单交易匹配
对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。
数据准备
订单数据从OrderLog.csv中读取,交易数据从ReceiptLog.csv中读取
JavaBean类的准备
package com.atguigu.flink.java.chapter_6;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
- @Author lizhenchao@atguigu.cn
- @Date 2020/12/11 20:10
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
private Long orderId;
private String eventType;
private String txId;
private Long eventTime;
}
package com.atguigu.flink.java.chapter_6;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
- @Author lizhenchao@atguigu.cn
- @Date 2020/12/11 20:12
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TxEvent {
private String txId;
private String payChannel;
private Long eventTime;
}
具体实现
package com.atguigu.flink.java.chapter_6;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 22:29
*/
public class Flink06_Project_Order {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 1. 读取Order流
SingleOutputStreamOperator orderEventDS = env
.readTextFile(“input/OrderLog.csv”)
.map(line -> {
String[] datas = line.split(“,”);
return new OrderEvent(
Long.valueOf(datas[0]),
datas[1],
datas[2],
Long.valueOf(datas[3]));}); // 2. 读取交易流 SingleOutputStreamOperator<TxEvent> txDS = env .readTextFile("input/ReceiptLog.csv") .map(line -> { String[] datas = line.split(","); return new TxEvent(datas[0], datas[1], Long.valueOf(datas[2])); }); // 3. 两个流连接在一起 ConnectedStreams<OrderEvent, TxEvent> orderAndTx = orderEventDS.connect(txDS); // 4. 因为不同的数据流到达的先后顺序不一致,所以需要匹配对账信息. 输出表示对账成功与否 orderAndTx .keyBy("txId", "txId") .process(new CoProcessFunction<OrderEvent, TxEvent, String>() { // 存 txId -> OrderEvent Map<String, OrderEvent> orderMap = new HashMap<>(); // 存储 txId -> TxEvent Map<String, TxEvent> txMap = new HashMap<>(); @Override public void processElement1(OrderEvent value, Context ctx, Collector<String> out) throws Exception { // 获取交易信息 if (txMap.containsKey(value.getTxId())) { out.collect("订单: " + value + " 对账成功"); txMap.remove(value.getTxId()); } else { orderMap.put(value.getTxId(), value); } } @Override public void processElement2(TxEvent value, Context ctx, Collector<String> out) throws Exception { // 获取订单信息 if (orderMap.containsKey(value.getTxId())) { OrderEvent orderEvent = orderMap.get(value.getTxId()); out.collect("订单: " + orderEvent + " 对账成功"); orderMap.remove(value.getTxId()); } else { txMap.put(value.getTxId(), value); } } }) .print(); env.execute();
}
}
Flink流处理高阶编程
在上一个章节中,我们已经学习了Flink的基础编程API的使用,接下来,我们来学习Flink编程的高阶部分。所谓的高阶部分内容,其实就是Flink与其他计算框架不相同且占优势的地方,比如Window和Exactly-Once,接下来我们就对这些内容进行详细的学习。
Flink的window机制
窗口概述
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算.窗口的分类
窗口分为2类:
基于时间的窗口(时间驱动)
基于元素个数的(数据驱动)
基于时间的窗口
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸.
在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp())
时间窗口又分4种:
滚动窗口(Tumbling Windows)
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
示例代码:
env
.socketTextStream(“hadoop102”, 9999)
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
Arrays.stream(value.split(“\W+”)).forEach(word -> out.collect(Tuple2.of(word, 1L)));
}
})
.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(8))) // 添加滚动窗口
.sum(1)
.print();
说明:
时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定.
我们传递给window函数的对象叫窗口分配器.
滑动窗口(Sliding Windows)
与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据.
示例代码:
env
.socketTextStream(“hadoop102”, 9999)
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
Arrays.stream(value.split(“\W+”)).forEach(word -> out.collect(Tuple2.of(word, 1L)));
}
})
.keyBy(t -> t.f0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 添加滚动窗口
.sum(1)
.print();
env.execute();
会话窗口(Session Windows)
会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口
示例代码:
静态gap
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
动态gap
.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
@Override
public long extract(Tuple2<String, Long> element) { // 返回 gap值, 单位毫秒
return element.f0.length() * 1000;
}
}))
创建原理:
因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction
全局窗口(Global Windows)
全局窗口分配器会分配相同key的所有元素进入同一个 Global window. 这种窗口机制只有指定自定义的触发器时才有用. 否则, 不会做任何计算, 因为这种窗口没有能够处理聚集在一起元素的结束点.
示例代码:
.window(GlobalWindows.create());
基于元素个数的窗口
按照指定的数据条数生成一个Window,与时间无关
分2类:
滚动窗口
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
实例代码
.countWindow(3)
说明:哪个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.
滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
实例代码
.countWindow(3, 2)
Window Function
前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素
ReduceFunction(增量聚合函数)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
System.out.println(value1 + " ----- " + value2);
// value1是上次聚合的结果. 所以遇到每个窗口的第一个元素时, 这个函数不会进来
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
AggregateFunction(增量聚合函数)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<Tuple2<String, Long>, Long, Long>() {
// 创建累加器: 初始化中间值
@Override
public Long createAccumulator() {
System.out.println("createAccumulator");
return 0L;
}
// 累加器操作
@Override
public Long add(Tuple2<String, Long> value, Long accumulator) {
System.out.println("add");
return accumulator + value.f1;
}
// 获取结果
@Override
public Long getResult(Long accumulator) {
System.out.println("getResult");
return accumulator;
}
// 累加器的合并: 只有会话窗口才会调用
@Override
public Long merge(Long a, Long b) {
System.out.println("merge");
return a + b;
}
})
ProcessWindowFunction(全窗口函数)
.process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
// 参数1: key 参数2: 上下文对象 参数3: 这个窗口内所有的元素 参数4: 收集器, 用于向下游传递数据
@Override
public void process(String key,
Context context,
Iterable<Tuple2<String, Long>> elements,
Collector<Tuple2<String, Long>> out) throws Exception {
System.out.println(context.window().getStart());
long sum = 0L;
for (Tuple2<String, Long> t : elements) {
sum += t.f1;
}
out.collect(Tuple2.of(key, sum));
}
})
Keyed vs Non-Keyed Windows
其实, 在用window前首先需要确认应该是在keyBy后的流上用, 还是在没有keyBy的流上使用.
在keyed streams上使用窗口, 窗口计算被并行的运用在多个task上, 可以认为每个task都有自己单独窗口. 正如前面的代码所示.
在非non-keyed stream上使用窗口, 流的并行度只能是1, 所有的窗口逻辑只能在一个单独的task上执行.
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
需要注意的是: 非key分区的流上使用window, 如果把并行度强行设置为>1, 则会抛出异常
Flik中的时间语义与WaterMark
Flink中的时间语义
在Flink的流式操作中, 会涉及不同的时间概念
处理时间(process time)
处理时间是指的执行操作的各个设备的时间
对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟.比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据. 假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据, 然后下个窗口是10:00am-11:00am, 等等
处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调. 他提供了最好的性能和最低的延迟. 但是, 在分布式和异步的环境下, 处理时间没有办法保证确定性, 容易受到数据传递速度的影响: 事件的延迟和乱序
在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器
事件时间(event time)
事件时间是指的这个事件发生的时间.
在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在.
在事件时间体系中, 时间的进度依赖于数据本身, 和任何设备的时间无关. 事件时间程序必须制定如何产生Event Time Watermarks(水印) . 在事件时间体系中, 水印是表示时间进度的标志(作用就相当于现实时间的时钟).
在理想情况下,不管事件时间何时到达或者他们的到达的顺序如何, 事件时间处理将产生完全一致且确定的结果. 事件时间处理会在等待无序事件(迟到事件)时产生一定的延迟。由于只能等待有限的时间,因此这限制了确定性事件时间应用程序的可使用性。
假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,该记录落入该小时,无论它们到达的顺序或处理时间。
在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器
注意:
在1.12之前默认的时间语义是处理时间, 从1.12开始, Flink内部已经把默认的语义改成了事件时间
哪种时间更重要
Flink中的WaterMark
支持event time的流式处理框架需要一种能够测量event time 进度的方式. 比如, 一个窗口算子创建了一个长度为1小时的窗口,那么这个算子需要知道事件时间已经到达了这个窗口的关闭时间, 从而在程序中去关闭这个窗口.
事件时间可以不依赖处理时间来表示时间的进度. 例如, 在程序中, 即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间. 另外一方面,使用事件时间可以在几秒内处理已经缓存在Kafka中多周的数据, 这些数据可以照样被正确处理, 就像实时发生的一样能够进入正确的窗口.
这种在Flink中去测量事件时间的进度的机制就是 watermark(水印). watermark作为数据流的一部分在流动, 并且携带一个时间戳t.
一个Watermark(t)表示在这个流里面事件时间已经到了时间t, 意味着此时, 流中不应该存在这样的数据: 他的时间戳t2<=t (时间比较旧或者等于时间戳)
有序流中的水印
在下面的这个图中, 事件是有序的(按照他们自己的时间戳来看), watermark是流中一个简单的周期性的标记
乱序流中的水印
在下图中, 按照他们时间戳来看, 这些事件是乱序的, 则watermark对于这些乱序的流来说至关重要.
通常情况下, 水印是一种标记, 是流中的一个点, 所有在这个时间戳(水印中的时间戳)前的数据应该已经全部到达. 一旦水印到达了算子, 则这个算子会提高他内部的时钟的值为这个水印的值.
Flink中如何产生水印
在 Flink 中, 水印由应用程序开发人员生成, 这通常需要对相应的领域有 一定的了解。完美的水印永远不会错:时间戳小于水印标记时间的事件不会再出现。在特殊情况下(例如非乱序事件流),最近一次事件的时间戳就可能是完美的水印。
启发式水印则相反,它只估计时间,因此有可能出错, 即迟到的事件 (其时间戳小于水印标记时间)晚于水印出现。针对启发式水印, Flink 提供了处理迟到元素的机制。
设定水印通常需要用到领域知识。举例来说,如果知道事件的迟到时间不会超过 5 秒, 就可以将水印标记时间设为收到的最大时间戳减去 5 秒。 另 一种做法是,采用一个 Flink 作业监控事件流,学习事件的迟到规律,并以此构建水印生成模型。
EventTime和WaterMark的使用
Flink内置了两个WaterMark生成器:
Monotonously Increasing Timestamps(时间戳单调增长:其实就是允许的延迟为0)
WatermarkStrategy.forMonotonousTimestamps();
Fixed Amount of Lateness(允许固定时间的延迟)
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
package com.atguigu.flink.java.chapter_7;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/12 20:29
*/
public class Flink10_Chapter07_OrderedWaterMark {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env .socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳 .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); } }); // 创建水印生产策略 WatermarkStrategy<WaterSensor> wms = WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // // 最大容忍的延迟时间 .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳 @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }); stream .assignTimestampsAndWatermarks(wms) // 指定水印和时间戳 .keyBy(WaterSensor: :getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { String msg = "当前key: " + key + "窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd()/1000 + ") 一共有 " + elements.spliterator().estimateSize() + "条数据 "; out.collect(msg); } }) .print(); env.execute();
}
}
自定义WatermarkStrategy
有2种风格的WaterMark生产方式: periodic(周期性) and punctuated(间歇性).都需要继承接口: WatermarkGenerator
周期性
package com.atguigu.flink.java.chapter_7;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/12 20:29
*/
public class Flink11_Chapter07_Period {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env .socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳 .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); } }); // 创建水印生产策略 WatermarkStrategy<WaterSensor> myWms = new WatermarkStrategy<WaterSensor>() { @Override public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { System.out.println("createWatermarkGenerator ...."); return new MyPeriod(3); } }.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { System.out.println("recordTimestamp " + recordTimestamp); return element.getTs() * 1000; } }); stream .assignTimestampsAndWatermarks(myWms) .keyBy(WaterSensor::getId) .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(5))) .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { String msg = "当前key: " + key + "窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 " + elements.spliterator().estimateSize() + "条数据 "; out.collect(context.window().toString()); out.collect(msg); } }) .print(); env.execute();
}
public static class MyPeriod implements WatermarkGenerator {
private long maxTs = Long.MIN_VALUE;
// 允许的最大延迟时间 ms
private final long maxDelay;public MyPeriod(long maxDelay) { this.maxDelay = maxDelay * 1000; this.maxTs = Long.MIN_VALUE + this.maxDelay + 1; } // 每收到一个元素, 执行一次. 用来生产WaterMark中的时间戳 @Override public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) { System.out.println("onEvent..." + eventTimestamp); //有了新的元素找到最大的时间戳 maxTs = Math.max(maxTs, eventTimestamp); System.out.println(maxTs); } // 周期性的把WaterMark发射出去, 默认周期是200ms @Override public void onPeriodicEmit(WatermarkOutput output) {
// System.out.println(“onPeriodicEmit…”);
// 周期性的发射水印: 相当于Flink把自己的时钟调慢了一个最大延迟
output.emitWatermark(new Watermark(maxTs - maxDelay - 1));
}
}
}
间歇性
public class Flink12_Chapter07_punctuated {
public static void main(String[] args) throws Exception {
// 省略…
public static class MyPunctuated implements WatermarkGenerator<WaterSensor> {
private long maxTs;
// 允许的最大延迟时间 ms
private final long maxDelay;
public MyPunctuated(long maxDelay) {
this.maxDelay = maxDelay * 1000;
this.maxTs = Long.MIN_VALUE + this.maxDelay + 1;
}
// 每收到一个元素, 执行一次. 用来生产WaterMark中的时间戳
@Override
public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
System.out.println("onEvent..." + eventTimestamp);
//有了新的元素找到最大的时间戳
maxTs = Math.max(maxTs, eventTimestamp);
output.emitWatermark(new Watermark(maxTs - maxDelay - 1));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 不需要实现
}
}
}
多并行度下WaterMark的传递
总结: 多并行度的条件下, 向下游传递WaterMark的时候, 总是以最小的那个WaterMark为准! 木桶原理!
窗口允许迟到的数据
已经添加了wartemark之后, 仍有数据会迟到怎么办? Flink的窗口, 也允许迟到数据.
当触发了窗口计算后, 会先计算当前的结果, 但是此时并不会关闭窗口.以后每来一条迟到数据, 则触发一次这条数据所在窗口计算(增量计算).
那么什么时候会真正的关闭窗口呢? wartermark 超过了窗口结束时间+等待时间
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
注意:
允许迟到只能运用在event time上
侧输出流(sideOutput)
接收窗口关闭之后的迟到数据
允许迟到数据, 窗口也会真正的关闭, 如果还有迟到的数据怎么办? Flink提供了一种叫做侧输出流的来处理关窗之后到达的数据.
package com.atguigu.flink.java.chapter_7;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/12 20:29
*/
public class Flink13_Chapter07_SideOutput {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
System.out.println(env.getConfig());SingleOutputStreamOperator<WaterSensor> stream = env .socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳 .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); } }); // 创建水印生产策略 WatermarkStrategy<WaterSensor> wms = WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 最大容忍的延迟时间 .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳 @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }); SingleOutputStreamOperator<String> result = stream .assignTimestampsAndWatermarks(wms) .keyBy(WaterSensor::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3)) .sideOutputLateData(new OutputTag<WaterSensor>("side_1") { }) // 设置侧输出流 .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { String msg = "当前key: " + key + " 窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 " + elements.spliterator().estimateSize() + "条数据" + "watermark: " + context.currentWatermark(); out.collect(context.window().toString()); out.collect(msg); } }); result.print(); result.getSideOutput(new OutputTag<WaterSensor>("side_1"){}).print(); env.execute();
}
}
使用侧输出流把一个流拆成多个流
split算子可以把一个流分成两个流, 从1.12开始已经被移除了. 官方建议我们用侧输出流来替换split算子的功能.
需求: 采集监控传感器水位值,将水位值高于5cm的值输出到side output
SingleOutputStreamOperator result =
env
.socketTextStream(“hadoop102”, 9999) // 在socket终端只输入毫秒级别的时间戳
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(“,”);
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}
})
.keyBy(ws -> ws.getTs())
.process(new KeyedProcessFunction<Long, WaterSensor, WaterSensor>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
out.collect(value);
if (value.getVc() > 5) { //水位大于5的写入到侧输出流
ctx.output(new OutputTag(“警告”) {}, value);
}
}
});
result.print(“主流”);
result.getSideOutput(new OutputTag(“警告”){}).print(“警告”);
ProcessFunction API(底层API)
我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如MapFunction这样的map转换算子就无法访问时间戳或者当前事件的事件时间。
基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
Flink提供了8个Process Function:
下面分别给一个示例, 有些其实已经学习过了.
ProcessFunction
env
.socketTextStream(“hadoop102”, 9999)
.map(line -> {
String[] datas = line.split(“,”);
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
})
.process(new ProcessFunction<WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
out.collect(value.toString());
}
})
.print();
KeyedProcessFunction
env
.socketTextStream(“hadoop102”, 9999)
.map(line -> {
String[] datas = line.split(“,”);
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
})
.keyBy(ws -> ws.getId())
.process(new KeyedProcessFunction<String, WaterSensor, String>() { // 泛型1:key的类型 泛型2:输入类型 泛型3:输出类型
@Override
public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
System.out.println(ctx.getCurrentKey());
out.collect(value.toString());
}
})
.print();
CoProcessFunction
DataStreamSource intStream = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource stringStream = env.fromElements(“a”, “b”, “c”);
ConnectedStreams<Integer, String> cs = intStream.connect(stringStream);
cs
.process(new CoProcessFunction<Integer, String, String>() {
@Override
public void processElement1(Integer value, Context ctx, Collector out) throws Exception {
out.collect(value.toString());
}
@Override
public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
out.collect(value);
}
})
.print();
ProcessJoinFunction
SingleOutputStreamOperator s1 = env
.socketTextStream(“hadoop102”, 8888) // 在socket终端只输入毫秒级别的时间戳
.map(value -> {
String[] datas = value.split(“,”);
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
SingleOutputStreamOperator s2 = env
.socketTextStream(“hadoop102”, 9999) // 在socket终端只输入毫秒级别的时间戳
.map(value -> {
String[] datas = value.split(“,”);
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
s1.join(s2)
.where(WaterSensor::getId)
.equalTo(WaterSensor::getId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 必须使用窗口
.apply(new JoinFunction<WaterSensor, WaterSensor, String>() {
@Override
public String join(WaterSensor first, WaterSensor second) throws Exception {
return "first: " + first + ", second: " + second;
}
})
.print();
BroadcastProcessFunction
后面专门讲解
KeyedBroadcastProcessFunction
keyBy之后使用
ProcessWindowFunction
添加窗口之后使用
ProcessAllWindowFunction
全窗口函数之后使用
定时器
基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行.
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前watermark的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
基于处理时间的定时器
SingleOutputStreamOperator stream = env
.socketTextStream(“hadoop102”, 9999) // 在socket终端只输入毫秒级别的时间戳
.map(value -> {
String[] datas = value.split(“,”);
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
stream
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
// 处理时间过后5s后触发定时器
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
out.collect(value.toString());
}
// 定时器被触发之后, 回调这个方法
// 参数1: 触发器被触发的时间
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
System.out.println(timestamp);
out.collect("我被触发了....");
}
})
.print();
基于事件时间的定时器
在测试的时候, 脑子里面要上课想着: 时间进展依据的是watermark
SingleOutputStreamOperator stream = env
.socketTextStream(“hadoop102”, 9999) // 在socket终端只输入毫秒级别的时间戳
.map(value -> {
String[] datas = value.split(“,”);
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
WatermarkStrategy wms = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000);
stream
.assignTimestampsAndWatermarks(wms)
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
System.out.println(ctx.timestamp());
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000);
out.collect(value.toString());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
System.out.println("定时器被触发.....");
}
})
.print();
定时器练习
需求:
监控水位传感器的水位值,如果水位值在5s之内(event time)连续上升,则报警。
SingleOutputStreamOperator stream = env
.socketTextStream(“hadoop102”, 9999) // 在socket终端只输入毫秒级别的时间戳
.map(value -> {
String[] datas = value.split(“,”);
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
WatermarkStrategy wms = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000);
stream
.assignTimestampsAndWatermarks(wms)
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
int lastVc = 0;
long timerTS = Long.MIN_VALUE;
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
if (value.getVc() > lastVc) {
if (timerTS == Long.MIN_VALUE) {
System.out.println("注册....");
timerTS = ctx.timestamp() + 5000L;
ctx.timerService().registerEventTimeTimer(timerTS);
}
} else {
ctx.timerService().deleteEventTimeTimer(timerTS);
timerTS = Long.MIN_VALUE;
}
lastVc = value.getVc();
System.out.println(lastVc);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + " 报警!!!!");
timerTS = Long.MIN_VALUE;
}
})
.print();
Flink状态编程
有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。
SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度.
Flink的状态管理是它的优势之一.
什么是状态
在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作).
那些需要记住多个事件信息的操作就是有状态的.
流式计算分为无状态计算和有状态计算两种情况。
无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。
有状态的计算则会基于多个事件输出结果。以下是一些例子。例如,计算过去一小时的平均水位,就是有状态的计算。所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,这是有状态的计算。流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
为什么需要管理状态
下面的几个场景都需要使用流处理的状态功能:
去重
数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
检测
检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
聚合
对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况
更新机器学习模型
在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。
Flink中的状态分类
Flink包括两种基本类型的状态Managed State和Raw State
Managed State Raw State
状态管理方式 Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩 用户自己管理
状态数据结构 Flink提供多种常用数据结构, 例如:ListState, MapState等 字节数组: byte[]
使用场景 绝大数Flink算子 所有算子
注意:
从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State一般是在已有算子和Managed State不够用时,用户自定义算子时使用。
在我们平时的使用中Managed State已经足够我们使用, 下面重点学习Managed State
Managed State的分类
对Managed State继续细分,它又有2种类型
Operator State(算子状态)
Keyed State(键控状态)
Operator State Keyed State
适用用算子类型 可用于所有算子: 常用于source, sink, 例如 FlinkKafkaConsumer 只适用于KeyedStream上的算子
状态分配 一个算子的子任务对应一个状态 一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State
创建和访问方式 实现CheckpointedFunction或ListCheckpointed(已经过时)接口 重写RichFunction, 通过里面的RuntimeContext访问
横向扩展 并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量 并发改变, State随着Key在实例间迁移
支持的数据结构 ListState和BroadCastState ValueState, ListState,MapState ReduceState, AggregatingState
算子状态的使用
Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。
注意: 算子子任务之间的状态不能互相访问
Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。
Flink为算子状态提供三种基本数据结构:
列表状态(List state)
将状态表示为一组数据的列表
联合列表状态(Union list state)
也是将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。
广播状态(Broadcast state)
是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
案例1: 列表状态
在map算子中计算数据的个数
package com.atguigu.flink.java.chapter_7.state;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/2 11:51
*/
public class Flink01_State_Operator {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment()
.setParallelism(3);
env
.socketTextStream(“hadoop102”, 9999)
.map(new MyCountMapper())
.print();env.execute();
}
private static class MyCountMapper implements MapFunction<String, Long>, CheckpointedFunction {
private Long count = 0L;
private ListState state;@Override public Long map(String value) throws Exception { count++; return count; } // 初始化时会调用这个方法,向本地状态中填充数据. 每个子任务调用一次 @Override public void initializeState(FunctionInitializationContext context) throws Exception { System.out.println("initializeState..."); state = context .getOperatorStateStore() .getListState(new ListStateDescriptor<Long>("state", Long.class)); for (Long c : state.get()) { count += c; } } // Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化 @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { System.out.println("snapshotState..."); state.clear(); state.add(count); }
}
}
案例2: 广播状态
从版本1.5.0开始,Apache Flink具有一种新的状态,称为广播状态。
广播状态被引入以支持这样的用例:来自一个流的一些数据需要广播到所有下游任务,在那里它被本地存储,并用于处理另一个流上的所有传入元素。作为广播状态自然适合出现的一个例子,我们可以想象一个低吞吐量流,其中包含一组规则,我们希望根据来自另一个流的所有元素对这些规则进行评估。考虑到上述类型的用例,广播状态与其他算子状态的区别在于:
- 它是一个map格式
- 它只对输入有广播流和无广播流的特定算子可用
- 这样的算子可以具有不同名称的多个广播状态。
package com.atguigu.flink.java.chapter_7.state;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/2 11:51
*/
public class Flink01_State_Operator_3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment()
.setParallelism(3);
DataStreamSource dataStream = env.socketTextStream(“hadoop102”, 9999);
DataStreamSource controlStream = env.socketTextStream(“hadoop102”, 8888);MapStateDescriptor<String, String> stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class); // 广播流 BroadcastStream<String> broadcastStream = controlStream.broadcast(stateDescriptor); dataStream .connect(broadcastStream) .process(new BroadcastProcessFunction<String, String, String>() { @Override public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { // 从广播状态中取值, 不同的值做不同的业务 ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor); if ("1".equals(state.get("switch"))) { out.collect("切换到1号配置...."); } else if ("0".equals(state.get("switch"))) { out.collect("切换到0号配置...."); } else { out.collect("切换到其他配置...."); } } @Override public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { BroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor); // 把值放入广播状态 state.put("switch", value); } }) .print(); env.execute();
}
}
键控状态的使用
键控状态是根据输入数据流中定义的键(key)来维护和访问的。
Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。
Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。
键控状态支持的数据类型
ValueState
保存单个值. 每个有key有一个状态值. 设置使用 update(T), 获取使用 T value()
ListState:
保存元素列表.
添加元素: add(T) addAll(List)
获取元素: Iterable get()
覆盖所有元素: update(List)
ReducingState:
存储单个值, 表示把所有元素的聚合结果添加到状态中. 与ListState类似, 但是当使用add(T)的时候ReducingState会使用指定的ReduceFunction进行聚合.
AggregatingState<IN, OUT>:
存储单个值. 与ReducingState类似, 都是进行聚合. 不同的是, AggregatingState的聚合的结果和元素类型可以不一样.
MapState<UK, UV>:
存储键值对列表.
添加键值对: put(UK, UV) or putAll(Map<UK, UV>)
根据key获取值: get(UK)
获取所有: entries(), keys() and values()
检测是否为空: isEmpty()
注意:
所有的类型都有clear(), 清空当前key的状态
这些状态对象仅用于用户与状态进行交互.
状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方
从状态获取的值与输入元素的key相关
案例1:ValueState
检测传感器的水位值,如果连续的两个水位值超过10,就输出报警。
package com.atguigu.flink.java.chapter_7.state;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/2 11:51
*/
public class Flink02_State_Keyed_Value {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment()
.setParallelism(3);
env
.socketTextStream(“hadoop102”, 9999)
.map(value -> {
String[] datas = value.split(“,”);
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}) .keyBy(WaterSensor::getId) .process(new KeyedProcessFunction<String, WaterSensor, String>() { private ValueState<Integer> state; @Override public void open(Configuration parameters) throws Exception { state = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("state", Integer.class)); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { Integer lastVc = state.value() == null ? 0 : state.value(); if (Math.abs(value.getVc() - lastVc) >= 10) { out.collect(value.getId() + " 红色警报!!!"); } state.update(value.getVc()); } }) .print(); env.execute();
}
}
案例2:ListState
针对每个传感器输出最高的3个水位值
package com.atguigu.flink.java.chapter_7.state;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/2 11:51
*/
public class Flink02_State_Keyed_ListState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment()
.setParallelism(3);
env
.socketTextStream(“hadoop102”, 9999)
.map(value -> {
String[] datas = value.split(“,”);
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}) .keyBy(WaterSensor::getId) .process(new KeyedProcessFunction<String, WaterSensor, List<Integer>>() { private ListState<Integer> vcState; @Override public void open(Configuration parameters) throws Exception { vcState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcState", Integer.class)); } @Override public void processElement(WaterSensor value, Context ctx, Collector<List<Integer>> out) throws Exception { vcState.add(value.getVc()); //1. 获取状态中所有水位高度, 并排序 List<Integer> vcs = new ArrayList<>(); for (Integer vc : vcState.get()) { vcs.add(vc); } // 2. 降序排列 vcs.sort((o1, o2) -> o2 - o1); // 3. 当长度超过3的时候移除最后一个 if (vcs.size() > 3) { vcs.remove(3); } vcState.update(vcs); out.collect(vcs); } }) .print(); env.execute();
}
}
案例3:ReducingState
计算每个传感器的水位和
.process(new KeyedProcessFunction<String, WaterSensor, Integer>() {
private ReducingState sumVcState;
@Override
public void open(Configuration parameters) throws Exception {
sumVcState = this
.getRuntimeContext()
.getReducingState(new ReducingStateDescriptor(“sumVcState”, Integer::sum, Integer.class));
}@Override
public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
sumVcState.add(value.getVc());
out.collect(sumVcState.get());
}
})
案例4:AggregatingState
计算每个传感器的平均水位
.process(new KeyedProcessFunction<String, WaterSensor, Double>() {private AggregatingState<Integer, Double> avgState;
@Override
public void open(Configuration parameters) throws Exception {
AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double> aggregatingStateDescriptor = new AggregatingStateDescriptor<>(“avgState”, new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0, 0);
}@Override public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) { return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1); } @Override public Double getResult(Tuple2<Integer, Integer> accumulator) { return accumulator.f0 * 1D / accumulator.f1; } @Override public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) { return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1); } }, Types.TUPLE(Types.INT, Types.INT)); avgState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
avgState.add(value.getVc());
out.collect(avgState.get());
}
})
案例5:MapState
去重: 去掉重复的水位值. 思路: 把水位值作为MapState的key来实现去重, value随意
.process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {
private MapState<Integer, String> mapState;
@Override
public void open(Configuration parameters) throws Exception {
mapState = this
.getRuntimeContext()
.getMapState(new MapStateDescriptor<Integer, String>(“mapState”, Integer.class, String.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
if (!mapState.contains(value.getVc())) {
out.collect(value);
mapState.put(value.getVc(), “随意”);
}
}
})
状态后端
每传入一条数据,有状态的算子任务都会读取和更新状态。 由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。
状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
状态后端主要负责两件事:
本地(taskmanager)的状态管理
将检查点(checkpoint)状态写入远程存储
状态后端的分类
状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适的状态后端.
Flink提供了3种状态后端:
MemoryStateBackend
内存级别的状态后端(默认),
存储方式:本地状态存储在TaskManager的内存中, checkpoint 存储在JobManager的内存中.
特点:快速, 低延迟, 但不稳定
使用场景: 1. 本地测试 2. 几乎无状态的作业(ETL) 3. JobManager不容易挂, 或者挂了影响不大. 4. 不推荐在生产环境下使用
FsStateBackend
存储方式: 本地状态在TaskManager内存, Checkpoint时, 存储在文件系统(hdfs)中
特点: 拥有内存级别的本地访问速度, 和更好的容错保证
使用场景: 1. 常规使用状态的作业. 例如分钟级别窗口聚合, join等 2. 需要开启HA的作业 3. 可以应用在生产环境中
RocksDBStateBackend
将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储)
存储方式: 1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘) 2. Checkpoint在外部文件系统(hdfs)中.
使用场景: 1. 超大状态的作业, 例如天级的窗口聚合 2. 需要开启HA的作业 3. 对读写状态性能要求不高的作业 4. 可以使用在生产环境
配置状态后端
全局配置状态后端
在flink-conf.yaml文件中设置默认的全局后端
在代码中配置状态后端
可以在代码中单独为这个Job设置状态后端.
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend(“hdfs://hadoop162:8020/flink/checkpoints/fs”));
如果要使用RocksDBBackend, 需要先引入依赖:
org.apache.flink
flink-statebackend-rocksdb_
s
c
a
l
a
.
b
i
n
a
r
y
.
v
e
r
s
i
o
n
<
/
a
r
t
i
f
a
c
t
I
d
>
<
v
e
r
s
i
o
n
>
{scala.binary.version}</artifactId> <version>
scala.binary.version</artifactId><version>{flink.version}
env.setStateBackend(new RocksDBStateBackend(“hdfs://hadoop162:8020/flink/checkpoints/rocksdb”));
Flink的容错机制
状态的一致性
当在分布式系统中引入状态时,自然也引入了一致性问题。
一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?
一致性级别
在流处理中,一致性可以分为3个级别:
at-most-once(最多一次):
这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。
at-least-once(至少一次):
这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
exactly-once(严格一次):
这指的是系统保证在发生故障后得到的计数结果与正确值一致.既不多算也不少算
曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二:
保证exactly-once的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性
流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补(例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。
最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。Flink避免了这种权衡。
Flink的一个重大价值在于,它既保证了exactly-once,又具有低延迟和高吞吐的处理能力。
从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。尽管这在外行看来很神奇,但是一旦了解,就会恍然大悟。
端到端的状态一致性
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。
具体划分如下:
source端
需要外部源可重设数据的读取位置.目前我们使用的Kafka Source具有这种特性: 读取数据的时候可以指定offset
flink内部
依赖checkpoint机制
sink端
需要保证从故障恢复时,数据不会重复写入外部系统. 有2种实现形式:
幂等(Idempotent)写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
事务性(Transactional)写入
需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)
Checkpoint原理
Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。
假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时,怎么办呢? 如果项链上有很多珠子,你显然不想从头再数一遍,尤其是当三人的速度不一样却又试图合作的时候,更是如此(比如想记录前一分钟三人一共数了多少颗珠子,回想一下一分钟滚动窗口)。
于是,你想了一个更好的办法: 在项链上每隔一段就松松地系上一根有色皮筋,将珠子分隔开; 当珠子被拨动的时候,皮筋也可以被拨动; 然后,你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数。相反,你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少。
Flink检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的; 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。
Flink的检查点算法
checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性.
快照的实现算法:
简单算法–暂停应用, 然后开始做检查点, 再重新恢复应用
Flink的改进Checkpoint算法. Flink的checkpoint机制原理来自"Chandy-Lamport algorithm"算法(分布式快照算)的一种变体: 异步 barrier 快照(asynchronous barrier snapshotting)
每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。
理解Barrier
流的barrier是Flink的Checkpoint中的一个核心概念. 多个barrier被插入到数据流中, 然后作为数据流的一部分随着数据流动(有点类似于Watermark).这些barrier不会跨越流中的数据.
每个barrier会把数据流分成两部分: 一部分数据进入当前的快照 , 另一部分数据进入下一个快照 . 每个barrier携带着快照的id. barrier 不会暂停数据的流动, 所以非常轻量级. 在流中, 同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照.
Flink的检查点制作过程
第一步: Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint. 然后Source Task会在数据流中安插CheckPoint barrier
第二步: source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有进来的 barrier 才会执行相应的 Checkpoint(barrier对齐, 但是新版本有一种新的: barrier)
第三步: 当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator。
第四步: 下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。
第五步: 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。
第六步: 最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。
严格一次语义: barrier对齐
在多并行度下, 如果要实现严格一次, 则要执行barrier对齐.
当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。
https://ci.apache.org/projects/flink/flink-docs-release-1.12/fig/stream_aligning.svg
当operator收到数字流的barrier n时, 它就不能处理(但是可以接收)来自该流的任何数据记录,直到它从字母流所有输入接收到 barrier n 为止。否则,它会混合属于快照 n 的记录和属于快照 n + 1 的记录。
接收到 barrier n 的流(数字流)暂时被搁置。从这些流接收的记录入输入缓冲区, 不会被处理。
图一中的 Checkpoint barrier n之后的数据 123已结到达了算子, 存入到输入缓冲区没有被处理, 只有等到字母流的Checkpoint barrier n到达之后才会开始处理.
一旦最后所有输入流都接收到 barrier n,Operator 就会把缓冲区中 pending 的输出数据发出去,然后把 CheckPoint barrier n 接着往下游发送。这里还会对自身进行快照。
至少一次语义: barrier不对齐
前面介绍了barrier对齐, 如果barrier不对齐会怎么样? 会重复消费, 就是至少一次语义.
假设不对齐, 在字母流的Checkpoint barrier n到达前, 已经处理了1 2 3. 等字母流Checkpoint barrier n到达之后, 会做Checkpoint n. 假设这个时候程序异常错误了, 则重新启动的时候会Checkpoint n之后的数据重新计算. 1 2 3 会被再次被计算, 所以123出现了重复计算.
Savepoint原理
Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)
原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
Flink不会自动创建保存点,因此用户(或外部调度程序)必须明确地触发创建操作
保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等
checkpoint和savepoint的区别
Savepoint Checkpoint
Savepoint是由命令触发, 由用户创建和删除 Checkpoint被保存在用户指定的外部路径中, flink自动触发
保存点存储在标准格式存储中,并且可以升级作业版本并可以更改其配置。
当作业失败或被取消时,将保留外部存储的检查点。
用户必须提供用于还原作业状态的保存点的路径。 用户必须提供用于还原作业状态的检查点的路径。
Kafka+Flink+Kafka 实现端到端严格一次
我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?
内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证部的状态一致性
source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction
内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。
具体的两阶段提交步骤总结如下:
某个checkpoint的第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”(第一阶段提交)
jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子状态后端会进行相应进行checkpoint,并通知 jobmanagerr
sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据(第二阶段提交)
外部kafka关闭事务,提交的数据可以正常消费了
在代码中测试Checkpoint
package com.atguigu.flink.java.chapter_7.state;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import java.util.Properties;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/2 11:51
*/
public class Flink04_State_Checkpoint {
public static void main(String[] args) throws Exception {
System.setProperty(“HADOOP_USER_NAME”, “atguigu”);Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092"); properties.setProperty("group.id", "Flink01_Source_Kafka"); properties.setProperty("auto.offset.reset", "latest"); StreamExecutionEnvironment env = StreamExecutionEnvironment .createLocalEnvironmentWithWebUI(new Configuration()) .setParallelism(3); env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop162:8020/flink/checkpoints/rocksdb")); // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(1000); // 高级选项: // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 开启在 job 中止后仍然保留的 externalized checkpoints env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env .addSource(new FlinkKafkaConsumer<>("sensor", new SimpleStringSchema(), properties)) .map(value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }) .keyBy(WaterSensor::getId) .process(new KeyedProcessFunction<String, WaterSensor, String>() { private ValueState<Integer> state; @Override public void open(Configuration parameters) throws Exception { state = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("state", Integer.class)); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { Integer lastVc = state.value() == null ? 0 : state.value(); if (Math.abs(value.getVc() - lastVc) >= 10) { out.collect(value.getId() + " 红色警报!!!"); } state.update(value.getVc()); } }) .addSink(new FlinkKafkaProducer<String>("hadoop162:9092", "alert", new SimpleStringSchema())); env.execute();
}
}从checkpoint恢复数据
bin/flink run -c com.atguigu.chapter07.state.Flink07_Kafka_Flink_Kafka -s hdfs://hadoop162:8020/flink/fs/aa62c578adb3c37056280e0c74f3d689/chk-9 flink1026-1.0-SNAPSHOT-jar-with-dependencies.jar
手动savepoint
bin/flink savepoint jobid hdfs://hadoop162:8020/savepoint
从savepoint恢复数据
与checkpoint恢复数据一样
Flink流处理高阶编程实战
基于埋点日志数据的网络流量统计
指定时间范围内网站总浏览量(PV)的统计
实现一个网站总浏览量的统计。我们可以设置滚动时间窗口,实时统计每小时内的网站PV。此前我们已经完成了该需求的流数据操作,当前需求是在之前的基础上增加了窗口信息
package com.atguigu.flink.java.chapter_8;
import com.atguigu.flink.java.chapter_6.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/4 15:27
*/
public class Flink01_Project_Product_PV {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建WatermarkStrategy WatermarkStrategy<UserBehavior> wms = WatermarkStrategy .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() { @Override public long extractTimestamp(UserBehavior element, long recordTimestamp) { return element.getTimestamp() * 1000L; } }); env .readTextFile("input/UserBehavior.csv") .map(line -> { // 对数据切割, 然后封装到POJO中 String[] split = line.split(","); return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4])); }) .filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为 .assignTimestampsAndWatermarks(wms) // 添加 Watermark .map(behavior -> Tuple2.of("pv", 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG)) // 使用Tuple类型, 方便后面求和 .keyBy(value -> value.f0) // keyBy: 按照key分组 .window(TumblingEventTimeWindows.of(Time.minutes(60))) // 分配窗口 .sum(1) // 求和 .print(); env.execute();
}
}
指定时间范围内网站独立访客数(UV)的统计
package com.atguigu.flink.java.chapter_8;
import com.atguigu.flink.java.chapter_6.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/4 15:27
*/
public class Flink02_Project_Product_UV {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建WatermarkStrategy WatermarkStrategy<UserBehavior> wms = WatermarkStrategy .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() { @Override public long extractTimestamp(UserBehavior element, long recordTimestamp) { return element.getTimestamp() * 1000L; } }); env .readTextFile("input/UserBehavior.csv") .map(line -> { // 对数据切割, 然后封装到POJO中 String[] split = line.split(","); return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4])); }) .filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为 .assignTimestampsAndWatermarks(wms) .keyBy(UserBehavior::getBehavior) .window(TumblingEventTimeWindows.of(Time.minutes(60))) .process(new ProcessWindowFunction<UserBehavior, Long, String, TimeWindow>() { private MapState<Long, String> userIdState; @Override public void open(Configuration parameters) throws Exception { userIdState = getRuntimeContext() .getMapState(new MapStateDescriptor<Long, String>("userIdState", Long.class, String.class)); } @Override public void process(String key, Context context, Iterable<UserBehavior> elements, Collector<Long> out) throws Exception { userIdState.clear(); for (UserBehavior ub : elements) { userIdState.put(ub.getUserId(), "随意"); } out.collect(userIdState.keys().spliterator().estimateSize()); } }) .print(); env.execute();
}
}
电商数据分析
电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘和分析,得到感兴趣的商业指标并增强对风险的控制。
电商用户行为数据多样,整体可以分为用户行为习惯数据和业务行为数据两大类。
用户的行为习惯数据包括了用户的登录方式、上线的时间点及时长、点击和浏览页面、页面停留时间以及页面跳转等等,我们可以从中进行流量统计和热门商品的统计,也可以深入挖掘用户的特征;这些数据往往可以从web服务器日志中直接读取到。
而业务行为数据就是用户在电商平台中针对每个业务(通常是某个具体商品)所作的操作,我们一般会在业务系统中相应的位置埋点,然后收集日志进行分析。
实时热门商品统计
需求分析
每隔5分钟输出最近1小时内点击量最多的前N个商品
最近一小时: 窗口长度
每隔5分钟: 窗口滑动步长
时间: 使用event-time
数据准备
这里依然采用UserBehavior.csv作为数据源,通过采集数据统计商品点击信息。
pojo类:
package com.atguigu.flink.java.chapter_8;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
- @Author lizhenchao@atguigu.cn
- @Date 2021/1/4 22:01
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HotItem {
private Long itemId;
private Long count;
private Long windowEndTime;
}
具体实现代码
package com.atguigu.flink.java.chapter_8;
import com.atguigu.flink.java.chapter_6.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/4 15:27
*/
public class Flink03_Project_Product_TopN {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建WatermarkStrategy WatermarkStrategy<UserBehavior> wms = WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness( Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() { @Override public long extractTimestamp(UserBehavior element, long recordTimestamp) { return element.getTimestamp() * 1000L; } }); env .readTextFile("input/UserBehavior.csv") .map(line -> { // 对数据切割, 然后封装到POJO中 String[] split = line.split(","); return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4])); }) .assignTimestampsAndWatermarks(wms) // 添加Watermark .filter(data -> "pv".equals(data.getBehavior())) // 过滤出来点击数据 .keyBy(UserBehavior::getItemId) // 按照产品id进行分组 .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5))) // 设置数据的窗口范围 .aggregate(new AggregateFunction<UserBehavior, Long, Long>() { @Override public Long createAccumulator() { return 0L; } @Override public Long add(UserBehavior value, Long accumulator) { return accumulator + 1L; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return a + b; } }, new ProcessWindowFunction<Long, HotItem, Long, TimeWindow>() { @Override public void process(Long key, Context context, Iterable<Long> elements, Collector<HotItem> out) throws Exception { out.collect(new HotItem(key, elements.iterator().next(), context.window().getEnd())); } }) .keyBy(HotItem::getWindowEndTime) // 需要统计窗口内的名次, 则需要把属于同一窗内的元素放在一起 .process(new KeyedProcessFunction<Long, HotItem, String>() { private ListState<HotItem> hotItems; private ValueState<Long> triggerTS; @Override public void open(Configuration parameters) throws Exception { hotItems = getRuntimeContext() .getListState(new ListStateDescriptor<HotItem>("hotItems", HotItem.class)); triggerTS = getRuntimeContext().getState(new ValueStateDescriptor<Long>("triggerTS", Long.class)); } @Override public void processElement(HotItem value, Context ctx, Collector<String> out) throws Exception { hotItems.add(value); if (triggerTS.value() == null) { ctx.timerService().registerEventTimeTimer(value.getWindowEndTime() + 1L); triggerTS.update(value.getWindowEndTime()); } } // 等属于某个窗口的所有商品信息来了之后再开始计算topN @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { Iterable<HotItem> hotItems = this.hotItems.get(); // 存储最终的结果 ArrayList<HotItem> result = new ArrayList<>(); for (HotItem hotItem : hotItems) { result.add(hotItem); } this.hotItems.clear(); triggerTS.clear(); // 对result 排序取前3 result.sort((o1, o2) -> o2.getCount().intValue() - o1.getCount().intValue()); StringBuilder sb = new StringBuilder(); sb.append("窗口结束时间: " + (timestamp - 1) + "\n"); sb.append("---------------------------------\n"); for (int i = 0; i < 3; i++) { sb.append(result.get(i) + "\n"); } sb.append("---------------------------------\n\n"); out.collect(sb.toString()); } }).setParallelism(1) .print(); env.execute();
}
}
基于服务器log的热门页面浏览量统计
对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。
我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问每一个url的次数,然后排序输出显示。
具体做法为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。
具体实现代码
package com.atguigu.flink.java.chapter_8;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
- @Author lizhenchao@atguigu.cn
- @Date 2021/1/5 22:14
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class PageCount {
private String url;
private Long count;
private Long windowEnd;
}
package com.atguigu.flink.java.chapter_8;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.TreeSet;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/4 15:27
*/
public class Flink04_Project_Page_TopN {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 创建WatermarkStrategy WatermarkStrategy<ApacheLog> wms = WatermarkStrategy .<ApacheLog>forBoundedOutOfOrderness(Duration.ofSeconds(60)) .withTimestampAssigner(new SerializableTimestampAssigner<ApacheLog>() { @Override public long extractTimestamp(ApacheLog element, long recordTimestamp) { return element.getEventTime(); } }); env .readTextFile("input/apache.log") .map(line -> { String[] data = line.split(" "); SimpleDateFormat df = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); return new ApacheLog(data[0], df.parse(data[3]).getTime(), data[5], data[6]); }) .assignTimestampsAndWatermarks(wms) .keyBy(ApacheLog::getUrl) .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.seconds(5))) .aggregate(new AggregateFunction<ApacheLog, Long, Long>() { @Override public Long createAccumulator() { return 0L; } @Override public Long add(ApacheLog value, Long accumulator) { return accumulator + 1L; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return a + b; } }, new ProcessWindowFunction<Long, PageCount, String, TimeWindow>() { // <url, count, endWindow> @Override public void process(String key, Context context, Iterable<Long> elements, Collector<PageCount> out) throws Exception { out.collect(new PageCount(key, elements.iterator().next(), context.window().getEnd())); } }) .keyBy(PageCount::getWindowEnd) .process(new KeyedProcessFunction<Long, PageCount, String>() { private ValueState<Long> timerTs; private ListState<PageCount> pageState; @Override public void open(Configuration parameters) throws Exception { pageState = getRuntimeContext().getListState(new ListStateDescriptor<PageCount>("pageState", PageCount.class)); timerTs = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class)); } @Override public void processElement(PageCount value, Context ctx, Collector<String> out) throws Exception { pageState.add(value); if (timerTs.value() == null) { ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 10L); timerTs.update(value.getWindowEnd()); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { // 换个排序的思路: 使用TreeSet的自动排序功能 TreeSet<PageCount> pageCounts = new TreeSet<>((o1, o2) -> { if (o1.getCount() < o2.getCount()) return 1; // else if(o1.getCount() - o2.getCount() == 0) return 0; else return -1; }); for (PageCount pageCount : pageState.get()) { pageCounts.add(pageCount); if (pageCounts.size() > 3) { // 如果长度超过N, 则删除最后一个, 让长度始终保持N pageCounts.pollLast(); } } StringBuilder sb = new StringBuilder(); sb.append("窗口结束时间: " + (timestamp - 10) + "\n"); sb.append("---------------------------------\n"); for (PageCount pageCount : pageCounts) { sb.append(pageCount + "\n"); } sb.append("---------------------------------\n\n"); out.collect(sb.toString()); } }) .print(); env.execute();
}
}
页面广告分析
页面广告点击量统计
电商网站的市场营销商业指标中,除了自身的APP推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,也是市场营销的重要指标。
对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。
在之前的需求实现中,已经统计的广告的点击次数总和,但是没有实现窗口操作,并且也未增加排名处理.
这次添加窗口, 并增加排名
@NoArgsConstructor
@AllArgsConstructor
@Data
public class AdsClickLog {
private long userId;
private long adsId;
private String province;
private String city;
private Long timestamp;
}
package com.atguigu.flink.java.chapter_8;
import com.atguigu.flink.java.chapter_6.AdsClickLog;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 22:29
*/
public class Flink05_Project_AdsClick {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 创建WatermarkStrategy
WatermarkStrategy wms = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(AdsClickLog element, long recordTimestamp) {
return element.getTimestamp() * 1000L;
}
});
env
.readTextFile(“input/AdClickLog.csv”)
.map(line -> {
String[] datas = line.split(“,”);
return new AdsClickLog(Long.valueOf(datas[0]),
Long.valueOf(datas[1]),
datas[2],
datas[3],
Long.valueOf(datas[4]));
})
.assignTimestampsAndWatermarks(wms)
// 安装 (省份, 广告) 分组
.keyBy(new KeySelector<AdsClickLog, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> getKey(AdsClickLog log) throws Exception {
return Tuple2.of(log.getProvince(), log.getAdId());
}
})
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.seconds(10)))
.allowedLateness(Time.seconds(10))
.sideOutputLateData(new OutputTag(“ads_late”) {
})
.aggregate(new AggregateFunction<AdsClickLog, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}@Override public Long add(AdsClickLog value, Long accumulator) { return accumulator + 1L; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return a + b; } }, new ProcessWindowFunction<Long, Tuple4<String, Long, Long, Long>, Tuple2<String, Long>, TimeWindow>() { @Override public void process(Tuple2<String, Long> key, Context ctx, Iterable<Long> elements, Collector<Tuple4<String, Long, Long, Long>> out) throws Exception { out.collect(Tuple4.of(key.f0, key.f1, elements.iterator().next(), ctx.window().getEnd())); } }) .keyBy(t -> t.f3) .process(new KeyedProcessFunction<Long, Tuple4<String, Long, Long, Long>, String>() { private ValueState<Long> windowEnd; private ListState<Tuple4<String, Long, Long, Long>> datas; @Override public void open(Configuration parameters) throws Exception { datas = getRuntimeContext() .getListState(new ListStateDescriptor<Tuple4<String, Long, Long, Long>>("datas", TypeInformation.of(new TypeHint<Tuple4<String, Long, Long, Long>>() { }))); windowEnd = getRuntimeContext().getState(new ValueStateDescriptor<Long>("windowEnd", Long.class)); } @Override public void processElement(Tuple4<String, Long, Long, Long> value, Context ctx, Collector<String> out) throws Exception { // 存数据 datas.add(value); // 注册定时器 if (windowEnd.value() == null) { ctx.timerService().registerEventTimeTimer(value.f3 + 10L); windowEnd.update(value.f3); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { ArrayList<Tuple4<String, Long, Long, Long>> result = new ArrayList<>(); for (Tuple4<String, Long, Long, Long> t : datas.get()) { result.add(t); } // 清空状态 windowEnd.clear(); datas.clear(); // 排序, 取top3 result.sort(new Comparator<Tuple4<String, Long, Long, Long>>() { @Override public int compare(Tuple4<String, Long, Long, Long> o1, Tuple4<String, Long, Long, Long> o2) { return (int) (o2.f2 - o1.f2); } }); // 返回的数据 StringBuilder sb = new StringBuilder(); sb.append("窗口结束时间: ").append(timestamp - 10).append("\n"); sb.append("---------------------------------\n"); for (int i = 0; i < Math.min(3, result.size()); i++) { sb.append(result.get(i)).append("\n"); } sb.append("---------------------------------\n\n"); out.collect(sb.toString()); } }) .print(); env.execute();
}
}
黑名单过滤
我们进行的点击量统计,同一用户的重复点击是会叠加计算的。
在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;但是如果用户在一段时间非常频繁地点击广告,这显然不是一个正常行为,有刷点击量的嫌疑。
所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束,如果对同一个广告点击超过一定限额(比如100次),应该把该用户加入黑名单并报警,此后其点击行为不应该再统计。
两个功能:
- 告警: 使用侧输出流
- 已经进入黑名单的用户的广告点击记录不再进行统计
package com.atguigu.flink.java.chapter_8;
import com.atguigu.flink.java.chapter_6.AdsClickLog;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.*;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 22:29
*/
public class Flink06_Project_AdsClick_BlackList {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 创建WatermarkStrategy
WatermarkStrategy wms = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(AdsClickLog element, long recordTimestamp) {
return element.getTimestamp() * 1000L;
}
});
SingleOutputStreamOperator result = env
.readTextFile(“input/AdClickLog.csv”)
.map(line -> {
String[] datas = line.split(“,”);
return new AdsClickLog(Long.valueOf(datas[0]),
Long.valueOf(datas[1]),
datas[2],
datas[3],
Long.valueOf(datas[4]));
})
.assignTimestampsAndWatermarks(wms)
// 按照装 (用户, 广告) 分组
.keyBy(new KeySelector<AdsClickLog, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> getKey(AdsClickLog log) throws Exception {
return Tuple2.of(log.getUserId(), log.getAdId());
}
})
// 1. 使用process 实现黑名单过滤
.process(new KeyedProcessFunction<Tuple2<Long, Long>, AdsClickLog, String>() {
private ValueState warned;
private ValueState clickCount;@Override public void open(Configuration parameters) throws Exception { clickCount = getRuntimeContext().getState(new ValueStateDescriptor<Long>("clickCount", Long.class)); warned = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("warned", Boolean.class)); } @Override public void processElement(AdsClickLog ele, Context ctx, Collector<String> out) throws Exception { // 1. 统计次数 if (clickCount.value() == null) { // 如果是第一条元素则把值更新为1 // 每天的第一条数据注册定时器, 明天0:0:0 触发这个定时器 long now = ctx.timestamp(); LocalDate today = LocalDateTime .ofEpochSecond(now / 1000, 0, ZoneOffset.ofHours(8)).toLocalDate(); long tomorrow = LocalDateTime .of(today.plusDays(1), LocalTime.of(0, 0, 0)) .toEpochSecond(ZoneOffset.ofHours(8)); ctx.timerService().registerEventTimeTimer(tomorrow); clickCount.update(1L); out.collect("用户: " + ele.getUserId() + ",广告: " + ele.getAdId() + ",点击量: " + clickCount.value()); } else if (clickCount.value() < 99) { // 小于100 则更新记数 clickCount.update(clickCount.value() + 1L); out.collect("用户: " + ele.getUserId() + ",广告: " + ele.getAdId() + ",点击量: " + clickCount.value()); } else { // 产生告警信息 if (warned.value() == null) { // 每天只报警一次 String msg = "用户: " + ele.getUserId() + "对广告: " + ele.getAdId() + "的点击量是: " + (clickCount.value() + 1L); ctx.output(new OutputTag<String>("黑名单") {}, msg); warned.update(true); } } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { // 黑名单相关数据是应该每天一清零: 使用定时器, 在每天的0:0:0清零数据 warned.clear(); clickCount.clear(); } }); result.print("正常数据"); result.getSideOutput(new OutputTag<String>("黑名单") {}).print("黑名单"); env.execute();
}
}
恶意登录监控
对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。
因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网站、也是几乎所有网站风控的基本一环。
数据源
文件: LoginLog.csv
封装数据的JavaBean类
package com.atguigu.flink.java.chapter_8;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
- @Author lizhenchao@atguigu.cn
- @Date 2021/1/7 17:05
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LoginEvent {
private Long userId;
private String ip;
private String eventType;
private Long eventTime;
}
具体实现代码
实现逻辑:
统计连续失败的次数:
- 把失败的时间戳放入到List中,
- 当List中的长度到达2的时候, 判断这个两个时间戳的差是否小于等于2s
- 如果是, 则这个用户在恶意登录
- 否则不是, 然后删除List的第一个元素
- 用于保持List的长度为2
- 如果出现成功, 则需要清空List集合
package com.atguigu.flink.java.chapter_8;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 22:29
*/
public class Flink07_Project_Login {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 创建WatermarkStrategy
WatermarkStrategy wms = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(LoginEvent element, long recordTimestamp) {
return element.getEventTime();
}
});
env
.readTextFile(“input/LoginLog.csv”)
.map(line -> {
String[] data = line.split(“,”);
return new LoginEvent(Long.valueOf(data[0]),
data[1],
data[2],
Long.parseLong(data[3]) * 1000L);
})
.assignTimestampsAndWatermarks(wms)
// 按照用户id分组
.keyBy(LoginEvent::getUserId)
.process(new KeyedProcessFunction<Long, LoginEvent, String>() {private ListState<Long> failTss; @Override public void open(Configuration parameters) throws Exception { failTss = getRuntimeContext().getListState(new ListStateDescriptor<Long>("failTss", Long.class)); } @Override public void processElement(LoginEvent value, Context ctx, Collector<String> out) throws Exception { /* 统计连续失败的次数: 1. 把失败的时间戳放入到List中, 2. 当List中的长度到达2的时候, 判断这个两个时间戳的差是否小于等于2s 3. 如果是, 则这个用户在恶意登录 4. 否则不是, 然后删除List的第一个元素用于保持List的长度为2 6. 如果出现登录成功, 则需要清空List集合, 重新开始计算 */ switch (value.getEventType()) { case "fail": // 1. 把时间戳放入到集合中 failTss.add(value.getEventTime()); // 2. 把状态中的元素转存到ArrayList中 ArrayList<Long> tss = new ArrayList<>(); for (Long ts : failTss.get()) { tss.add(ts); } // 3. 如果长度等于2, 判断2次失败的时间是否在2秒以内 // 3.1 如果是则报警 // 3.2 否则, 应该删除第一条数据删除 if (tss.size() == 2) { long delta = tss.get(1) - tss.get(0); if (delta / 1000 <= 2) { out.collect(value.getUserId() + " 在恶意登录, 请注意!!!"); } else { tss.remove(0); failTss.update(tss); } } break; case "success": failTss.clear(); break; default: } } }) .print(); env.execute();
}
}
订单支付实时监控
在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。
对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。
另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。
package com.atguigu.flink.java.chapter_8;
import com.atguigu.flink.java.chapter_6.OrderEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 22:29
*/
public class Flink08_Project_OrderWatch {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 创建WatermarkStrategy
WatermarkStrategy wms = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(OrderEvent element, long recordTimestamp) {
return element.getEventTime();
}
});
env
.readTextFile(“input/OrderLog.csv”)
.map(line -> {
String[] datas = line.split(“,”);
return new OrderEvent(
Long.valueOf(datas[0]),
datas[1],
datas[2],
Long.parseLong(datas[3]) * 1000);}) .assignTimestampsAndWatermarks(wms) .keyBy(OrderEvent::getOrderId) .process(new KeyedProcessFunction<Long, OrderEvent, String>() { private ValueState<Long> timeoutTs; private ValueState<OrderEvent> createEvent; private ValueState<OrderEvent> payEvent; @Override public void open(Configuration parameters) throws Exception { createEvent = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("createEvent", OrderEvent.class)); payEvent = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("payEvent", OrderEvent.class)); timeoutTs = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timeoutTs", Long.class)); } @Override public void processElement(OrderEvent value, Context ctx, Collector<String> out) throws Exception { // 判断数据类型: 下单or支付 String eventType = value.getEventType(); // a: 有支付又有订单的情况 if ("create".equals(eventType)) { // 判断支付是否已经来了 if (payEvent.value() == null) { // 支付还没来, 把自己保存 createEvent.update(value); } else { // 支付来过: 判断创建订单和支付时间间隔是否在规定时间内:比如15分钟 if (payEvent.value().getEventTime() - value.getEventTime() <= 15 * 60 * 1000) { out.collect("订单: " + value.getOrderId() + " 正常支付"); } else { out.collect("订单: " + value.getOrderId() + " 在超时时间完成的支付, 系统可能存在漏洞"); } payEvent.clear(); // 清楚支付的状态 } } else { // 判断下单是否已经来了 if (createEvent.value() == null) { payEvent.update(value); } else { if (value.getEventTime() - createEvent.value().getEventTime() <= 15 * 60 * 1000) { out.collect("订单: " + value.getOrderId() + " 正常支付"); } else { out.collect("订单: " + value.getOrderId() + " 在超时时间完成的支付, 系统可能存在漏洞"); } createEvent.clear(); } } // b: 只有支付或者只有订单的情况 如果超过20分钟,没有收到另外一方的信息 // 使用定时器来处理这种异常情况 if (timeoutTs.value() == null) { // 第一条数据过来 ctx.timerService().registerEventTimeTimer(value.getEventTime() + 20 * 60 * 1000L); timeoutTs.update(value.getEventTime() + 20 * 60 * 1000L); } else { // 第2条数据过来了, 删除定时器 ctx.timerService().deleteEventTimeTimer(timeoutTs.value()); timeoutTs.clear(); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { if (payEvent.value() == null) { // 说明payEvent没来 out.collect("订单: " + ctx.getCurrentKey() + " 支付超时, 订单自动取消!"); }else{ //说明createEvent没来 out.collect("订单: " + ctx.getCurrentKey() + " 成功支付, 但是没有下单数据, 请检查系统!"); } payEvent.clear(); createEvent.clear(); timeoutTs.clear(); } }) .print(); env.execute();
}
}
Flink CEP编程
什么是FlinkCEP
FlinkCEP(Complex event processing for Flink) 是在Flink实现的复杂事件处理库. 它可以让你在无界流中检测出特定的数据,有机会掌握数据中重要的那部分。
是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。
目标:从有序的简单事件流中发现一些高阶特征
输入:一个或多个由简单事件构成的事件流
处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
输出:满足规则的复杂事件Flink CEP应用场景
风险控制
对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
策略营销
用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
运维监控
灵活配置多指标、多依赖来实现更复杂的监控模式。
CEP开发基本步骤
导入CEP相关依赖
org.apache.flink
flink-cep_ s c a l a . b i n a r y . v e r s i o n < / a r t i f a c t I d > < v e r s i o n > {scala.binary.version}</artifactId> <version> scala.binary.version</artifactId><version>{flink.version}
基本使用
package com.atguigu.flink.java.chapter_9;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.util.List;
import java.util.Map;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/8 21:41
*/
public class Flink01_CEP_BasicUse {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> waterSensorStream = env .readTextFile("input/sensor.txt") .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] split = value.split(","); return new WaterSensor(split[0], Long.parseLong(split[1]) * 1000, Integer.parseInt(split[2])); } }) .assignTimestampsAndWatermarks(WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> element.getTs())); // 1. 定义模式 Pattern<WaterSensor, WaterSensor> pattern = Pattern .<WaterSensor>begin("start") .where(new SimpleCondition<WaterSensor>() { @Override public boolean filter(WaterSensor value) throws Exception { return "sensor_1".equals(value.getId()); } }); // 2. 在流上应用模式 PatternStream<WaterSensor> waterSensorPS = CEP.pattern(waterSensorStream, pattern); // 3. 获取匹配到的结果 waterSensorPS .select(new PatternSelectFunction<WaterSensor, String>() { @Override public String select(Map<String, List<WaterSensor>> pattern) throws Exception { return pattern.toString(); } }) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
}
}
sensor.txt数据:
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
结果:
模式API
模式API可以让你定义想从输入流中抽取的复杂模式序列。
几个概念:
模式:
比如找拥有相同属性事件序列的模式(前面案例中的拥有相同的id), 我们一般把简单模式称之为模式
注意:
每个模式必须有一个独一无二的名字,你可以在后面使用它来识别匹配到的事件。(比如前面的start模式)
模式的名字不能包含字符":"
模式序列
每个复杂的模式序列包括多个简单的模式,也叫模式序列. 你可以把模式序列看作是这样的模式构成的图, 这些模式基于用户指定的条件从一个转换到另外一个
匹配
输入事件的一个序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。
单个模式
单个模式可以是单例模式或者循环模式.
单例模式
单例模式只接受一个事件. 默认情况模式都是单例模式.
前面的例子就是一个单例模式
循环模式
循环模式可以接受多个事件.
单例模式配合上量词就是循环模式.(非常类似我们熟悉的正则表达式)
固定次数
// 1. 定义模式
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
});
// 1.1 使用量词 出现两次
Pattern<WaterSensor, WaterSensor> patternWithQuantifier = pattern.times(2);
范围内的次数
// 1.1 使用量词 [2,4] 2次,3次或4次
Pattern<WaterSensor, WaterSensor> patternWithQuantifier = pattern.times(2, 4);
一次或多次
Pattern<WaterSensor, WaterSensor> patternWithQuantifier = pattern.oneOrMore();
多次及多次以上
// 2次或2次一样
Pattern<WaterSensor, WaterSensor> patternWithQuantifier = pattern.timesOrMore(2);
条件
对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如前面用到的where就是一种条件
迭代条件
这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new IterativeCondition() {
@Override
public boolean filter(WaterSensor value, Context ctx) throws Exception {
return “sensor_1”.equals(value.getId());
}
});
简单条件
这种类型的条件扩展了前面提到的IterativeCondition类,它决定是否接受一个事件只取决于事件自身的属性。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
System.out.println(value);
return “sensor_1”.equals(value.getId());
}
});
组合条件
把多个条件结合起来使用. 这适用于任何条件,你可以通过依次调用where()来组合条件。 最终的结果是每个单一条件的结果的逻辑AND。
如果想使用OR来组合条件,你可以像下面这样使用or()方法。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new IterativeCondition() {
@Override
public boolean filter(WaterSensor value, Context ctx) throws Exception {
return “sensor_1”.equals(value.getId());
}
})
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return value.getVc() > 30;
}
})
.or(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return value.getTs() > 3000;
}
});
停止条件
如果使用循环模式(oneOrMore, timesOrMore), 可以指定一个停止条件, 否则有可能会内存吃不消.
意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new IterativeCondition() {
@Override
public boolean filter(WaterSensor value, Context ctx) throws Exception {
return “sensor_1”.equals(value.getId());
}
})
.timesOrMore(2)
.until(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return value.getVc() >= 40;
}
});
组合模式(模式序列)
把多个单个模式组合在一起就是组合模式. 组合模式由一个初始化模式(.begin(…))开头
严格连续
期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
})
.next(“end”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_2”.equals(value.getId());
}
});
注意:
notNext 如果不想后面直接连着一个特定事件
松散连续
忽略匹配的事件之间的不匹配的事件。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
})
.followedBy(“end”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_2”.equals(value.getId());
}
});
注意:
notFollowBy 如果不想一个特定事件发生在两个事件之间的任何地方。(notFollowBy不能位于事件的最后)
非确定的松散连续
更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配
当且仅当数据为a,c,b,b时,对于followedBy模式而言命中的为{a,b},对于followedByAny而言会有两次命中{a,b},{a,b}
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
})
.followedByAny(“end”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_2”.equals(value.getId());
}
});
模式知识补充
循环模式的连续性
前面的连续性也可以运用在单个循环模式中. 连续性会被运用在被接受进入模式的事件之间。
松散连续
默认是松散连续
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
})
.times(2);
严格连续
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
})
.times(2)
.consecutive();
非确定的松散连续
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
})
.times(2)
.allowCombinations();
循环模式的贪婪性
在组合模式情况下, 对次数的处理尽快能获取最多个的那个次数, 就是贪婪!当一个事件同时满足两个模式的时候起作用.
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
}).times(2, 3).greedy()
.next(“end”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return value.getVc() == 30;
}
});
数据:
sensor_1,1,10
sensor_1,2,20
sensor_1,3,30
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
结果:
{start=[WaterSensor(id=sensor_1, ts=1000, vc=10), WaterSensor(id=sensor_1, ts=2000, vc=20), WaterSensor(id=sensor_1, ts=3000, vc=30)], end=[WaterSensor(id=sensor_2, ts=4000, vc=30)]}
{start=[WaterSensor(id=sensor_1, ts=2000, vc=20), WaterSensor(id=sensor_1, ts=3000, vc=30)], end=[WaterSensor(id=sensor_2, ts=4000, vc=30)]}
分析:
sensor_1,3,30 在匹配的的时候, 既能匹配第一个模式也可以匹配的第二个模式, 由于第一个模式使用量词则使用greedy的时候会优先匹配第一个模式, 因为要尽可能多的次数
注意:
一般贪婪比非贪婪的结果要少!
模式组不能设置为greedy
模式可选性
可以使用pattern.optional()方法让所有的模式变成可选的,不管是否是循环模式
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
}).times(2).optional() // 0次或2次
.next(“end”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_2”.equals(value.getId());
}
});
说明:
start模式可能会没有!
模式组
在前面的代码中次数只能用在某个模式上, 比如: .begin(…).where(…).next(…).where(…).times(2) 这里的次数只会用在next这个模式上, 而不会用在begin模式上.
如果需要用在多个模式上,可以使用模式组!
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
})
.next(“next”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_2”.equals(value.getId());
}
}))
.times(2);
结果: sensor_1,sensor_2, sensor_1, sensor_2
超时数据
当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.begin(“start”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_1”.equals(value.getId());
}
})
.next(“end”)
.where(new SimpleCondition() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return “sensor_2”.equals(value.getId());
}
})
.within(Time.seconds(2));
匹配后跳过策略
对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。 有五种跳过策略,如下:
NO_SKIP: 每个成功的匹配都会被输出。
SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。
SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。
AfterMatchSkipStrategy skipStrategy = …
Pattern.begin(“patternName”, skipStrategy);
Flink CEP编程实战
恶意登录监控
package com.atguigu.flink.chapter10;
import com.atguigu.flink.bean.LoginEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.List;
import java.util.Map;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/30 10:26
*/
public class Flink01_Project_Login {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 创建WatermarkStrategy
WatermarkStrategy wms = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(LoginEvent element, long recordTimestamp) {
return element.getEventTime();
}
});
KeyedStream<LoginEvent, Long> loginKS = env
.readTextFile(“input/LoginLog.csv”)
.map(line -> {
String[] data = line.split(“,”);
return new LoginEvent(Long.valueOf(data[0]),
data[1],
data[2],
Long.parseLong(data[3]) * 1000L);
})
.assignTimestampsAndWatermarks(wms)
.keyBy(LoginEvent::getUserId);// 1. 定义模式 Pattern<LoginEvent, LoginEvent> pattern = Pattern .<LoginEvent>begin("fail") .where(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent value) throws Exception { return "fail".equals(value.getEventType()); } }) .timesOrMore(2).consecutive() .until(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent value) throws Exception { return "success".equals(value.getEventType()); } }) .within(Time.seconds(2)); // 2. 把模式使用在流上 PatternStream<LoginEvent> ps = CEP.pattern(loginKS, pattern); // 3. 获取数据 ps .select(new PatternSelectFunction<LoginEvent, String>() { @Override public String select(Map<String, List<LoginEvent>> pattern) throws Exception { return pattern.get("fail").toString(); } }) .print(); env.execute();
}
}
订单支付实时监控
package com.atguigu.flink.java.chapter_10;
import com.atguigu.flink.java.chapter_6.OrderEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.List;
import java.util.Map;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2020/12/10 22:29
*/
public class Flink02_CEP_Project_OrderWatch {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建WatermarkStrategy
WatermarkStrategy wms = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(OrderEvent element, long recordTimestamp) {
return element.getEventTime();
}
});
KeyedStream<OrderEvent, Long> orderKS = env
.readTextFile(“input/OrderLog.csv”)
.map(line -> {
String[] datas = line.split(“,”);
return new OrderEvent(
Long.valueOf(datas[0]),
datas[1],
datas[2],
Long.parseLong(datas[3]) * 1000);}) .assignTimestampsAndWatermarks(wms) .keyBy(OrderEvent::getOrderId); Pattern<OrderEvent, OrderEvent> orderEventPattern = Pattern .<OrderEvent>begin("create") .where(new SimpleCondition<OrderEvent>() { @Override public boolean filter(OrderEvent value) throws Exception { return "create".equals(value.getEventType()); } }) .next("pay") .where(new SimpleCondition<OrderEvent>() { @Override public boolean filter(OrderEvent value) throws Exception { return "pay".equals(value.getEventType()); } }) .within(Time.minutes(15)); PatternStream<OrderEvent> pattern = CEP.pattern(orderKS, orderEventPattern); SingleOutputStreamOperator<String> result = pattern .select(new OutputTag<String>("timeout"){}, new PatternTimeoutFunction<OrderEvent, String>() { @Override public String timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp) throws Exception { return pattern.toString(); } }, new PatternSelectFunction<OrderEvent, String>() { @Override public String select(Map<String, List<OrderEvent>> pattern) throws Exception { return pattern.toString(); } }); result.getSideOutput(new OutputTag<String>("timeout") {}).print(); env.execute();
}
}
Flink SQL编程
如果用户需要同时流计算、批处理的场景下,用户需要维护两套业务代码,开发人员也要维护两套技术栈,非常不方便。
Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个特例,从而实现流批统一,Flink 社区的开发人员在多轮讨论后,基本敲定了Flink 未来的技术架构
Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。
Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。
Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。
Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。
注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。
核心概念
Flink 的 Table API 和 SQL 是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易理解.
动态表和连续查询
动态表(Dynamic Tables) 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。
与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。
需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。
将流转换为动态表。
在动态表上计算一个连续查询,生成一个新的动态表。
生成的动态表被转换回流。
在流上定义表(动态表)
为了使用关系查询处理流,必须将其转换成 Table。从概念上讲,流的每条记录都被解释为对结果表的 INSERT 操作。
假设有如下格式的数据:
[
user: VARCHAR, // 用户名
cTime: TIMESTAMP, // 访问 URL 的时间
url: VARCHAR // 用户访问的 URL
]
下图显示了单击事件流(左侧)如何转换为表(右侧)。当插入更多的单击流记录时,结果表的数据将不断增长。
连续查询
在动态表上计算一个连续查询,并生成一个新的动态表。与批处理查询不同,连续查询从不终止,并根据其输入表上的更新更新其结果表。
在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。
说明:
当查询开始,clicks 表(左侧)是空的。
当第一行数据被插入到 clicks 表时,查询开始计算结果表。第一行数据 [Mary,./home] 插入后,结果表(右侧,上部)由一行 [Mary, 1] 组成。
当第二行 [Bob, ./cart] 插入到 clicks 表时,查询会更新结果表并插入了一行新数据 [Bob, 1]。
第三行 [Mary, ./prod?id=1] 将产生已计算的结果行的更新,[Mary, 1] 更新成 [Mary, 2]。
最后,当第四行数据加入 clicks 表时,查询将第三行 [Liz, 1] 插入到结果表中。
Flink Table API
导入需要的依赖
org.apache.flink
flink-table-planner-blink_
s
c
a
l
a
.
b
i
n
a
r
y
.
v
e
r
s
i
o
n
<
/
a
r
t
i
f
a
c
t
I
d
>
<
v
e
r
s
i
o
n
>
{scala.binary.version}</artifactId> <version>
scala.binary.version</artifactId><version>{flink.version}
provided
org.apache.flink
flink-streaming-scala_
s
c
a
l
a
.
b
i
n
a
r
y
.
v
e
r
s
i
o
n
<
/
a
r
t
i
f
a
c
t
I
d
>
<
v
e
r
s
i
o
n
>
{scala.binary.version}</artifactId> <version>
scala.binary.version</artifactId><version>{flink.version}
provided
org.apache.flink
flink-csv
f
l
i
n
k
.
v
e
r
s
i
o
n
<
/
v
e
r
s
i
o
n
>
<
/
d
e
p
e
n
d
e
n
c
y
>
<
d
e
p
e
n
d
e
n
c
y
>
<
g
r
o
u
p
I
d
>
o
r
g
.
a
p
a
c
h
e
.
f
l
i
n
k
<
/
g
r
o
u
p
I
d
>
<
a
r
t
i
f
a
c
t
I
d
>
f
l
i
n
k
−
j
s
o
n
<
/
a
r
t
i
f
a
c
t
I
d
>
<
v
e
r
s
i
o
n
>
{flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>
flink.version</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink−json</artifactId><version>{flink.version}
基本使用:表与DataStream的混合使用
package com.atguigu.flink.java.chapter_11;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/11 21:43
*/
public class Flink01_TableApi_BasicUse {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource waterSensorStream =
env.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60));// 1. 创建表的执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 2. 创建表: 将流转换成动态表. 表的字段名从pojo的属性名自动抽取 Table table = tableEnv.fromDataStream(waterSensorStream); // 3. 对动态表进行查询 Table resultTable = table .where($("id").isEqual("sensor_1")) .select($("id"), $("ts"), $("vc")); // 4. 把动态表转换成流 DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class); resultStream.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
}
}
基本使用:聚合操作
// 3. 对动态表进行查询
Table resultTable = table
.where( ( " v c " ) . i s G r e a t e r O r E q u a l ( 20 ) ) . g r o u p B y ( ("vc").isGreaterOrEqual(20)) .groupBy( ("vc").isGreaterOrEqual(20)).groupBy((“id”))
.aggregate( ( " v c " ) . s u m ( ) . a s ( " v c s u m " ) ) . s e l e c t ( ("vc").sum().as("vc_sum")) .select( ("vc").sum().as("vcsum")).select((“id”), $(“vc_sum”));
// 4. 把动态表转换成流 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
表到流的转换
动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:
Append-only 流
仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。
Retract 流
retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。
Upsert 流
upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程。
请注意,在将动态表转换为 DataStream 时,只支持 append 流和 retract 流。
通过Connector声明读入数据
前面是先得到流, 再转成动态表, 其实动态表也可以直接连接到数据
File source
// 2. 创建表
// 2.1 表的元数据信息
Schema schema = new Schema()
.field(“id”, DataTypes.STRING())
.field(“ts”, DataTypes.BIGINT())
.field(“vc”, DataTypes.INT());
// 2.2 连接文件, 并创建一个临时表, 其实就是一个动态表
tableEnv.connect(new FileSystem().path(“input/sensor.txt”))
.withFormat(new Csv().fieldDelimiter(‘,’).lineDelimiter(“\n”))
.withSchema(schema)
.createTemporaryTable(“sensor”);
// 3. 做成表对象, 然后对动态表进行查询
Table sensorTable = tableEnv.from(“sensor”);
Table resultTable = sensorTable
.groupBy(
(
"
i
d
"
)
)
.
s
e
l
e
c
t
(
("id")) .select(
("id")).select((“id”),
(
"
i
d
"
)
.
c
o
u
n
t
(
)
.
a
s
(
"
c
n
t
"
)
)
;
/
/
4.
把
动
态
表
转
换
成
流
.
如
果
涉
及
到
数
据
的
更
新
,
要
用
到
撤
回
流
.
多
个
了
一
个
b
o
o
l
e
a
n
标
记
D
a
t
a
S
t
r
e
a
m
<
T
u
p
l
e
2
<
B
o
o
l
e
a
n
,
R
o
w
>
>
r
e
s
u
l
t
S
t
r
e
a
m
=
t
a
b
l
e
E
n
v
.
t
o
R
e
t
r
a
c
t
S
t
r
e
a
m
(
r
e
s
u
l
t
T
a
b
l
e
,
R
o
w
.
c
l
a
s
s
)
;
r
e
s
u
l
t
S
t
r
e
a
m
.
p
r
i
n
t
(
)
;
K
a
f
k
a
S
o
u
r
c
e
/
/
2.
创
建
表
/
/
2.1
表
的
元
数
据
信
息
S
c
h
e
m
a
s
c
h
e
m
a
=
n
e
w
S
c
h
e
m
a
(
)
.
f
i
e
l
d
(
"
i
d
"
,
D
a
t
a
T
y
p
e
s
.
S
T
R
I
N
G
(
)
)
.
f
i
e
l
d
(
"
t
s
"
,
D
a
t
a
T
y
p
e
s
.
B
I
G
I
N
T
(
)
)
.
f
i
e
l
d
(
"
v
c
"
,
D
a
t
a
T
y
p
e
s
.
I
N
T
(
)
)
;
/
/
2.2
连
接
文
件
,
并
创
建
一
个
临
时
表
,
其
实
就
是
一
个
动
态
表
t
a
b
l
e
E
n
v
.
c
o
n
n
e
c
t
(
n
e
w
K
a
f
k
a
(
)
.
v
e
r
s
i
o
n
(
"
u
n
i
v
e
r
s
a
l
"
)
.
t
o
p
i
c
(
"
s
e
n
s
o
r
"
)
.
s
t
a
r
t
F
r
o
m
L
a
t
e
s
t
(
)
.
p
r
o
p
e
r
t
y
(
"
g
r
o
u
p
.
i
d
"
,
"
b
i
g
d
a
t
a
"
)
.
p
r
o
p
e
r
t
y
(
"
b
o
o
t
s
t
r
a
p
.
s
e
r
v
e
r
s
"
,
"
h
a
d
o
o
p
162
:
9092
,
h
a
d
o
o
p
163
:
9092
,
h
a
d
o
o
p
164
:
9092
"
)
)
.
w
i
t
h
F
o
r
m
a
t
(
n
e
w
J
s
o
n
(
)
)
.
w
i
t
h
S
c
h
e
m
a
(
s
c
h
e
m
a
)
.
c
r
e
a
t
e
T
e
m
p
o
r
a
r
y
T
a
b
l
e
(
"
s
e
n
s
o
r
"
)
;
/
/
3.
对
动
态
表
进
行
查
询
T
a
b
l
e
s
e
n
s
o
r
T
a
b
l
e
=
t
a
b
l
e
E
n
v
.
f
r
o
m
(
"
s
e
n
s
o
r
"
)
;
T
a
b
l
e
r
e
s
u
l
t
T
a
b
l
e
=
s
e
n
s
o
r
T
a
b
l
e
.
g
r
o
u
p
B
y
(
("id").count().as("cnt")); // 4. 把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记 DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class); resultStream.print(); Kafka Source // 2. 创建表 // 2.1 表的元数据信息 Schema schema = new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); // 2.2 连接文件, 并创建一个临时表, 其实就是一个动态表 tableEnv .connect(new Kafka() .version("universal") .topic("sensor") .startFromLatest() .property("group.id", "bigdata") .property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092")) .withFormat(new Json()) .withSchema(schema) .createTemporaryTable("sensor"); // 3. 对动态表进行查询 Table sensorTable = tableEnv.from("sensor"); Table resultTable = sensorTable .groupBy(
("id").count().as("cnt"));//4.把动态表转换成流.如果涉及到数据的更新,要用到撤回流.多个了一个boolean标记DataStream<Tuple2<Boolean,Row>>resultStream=tableEnv.toRetractStream(resultTable,Row.class);resultStream.print();KafkaSource//2.创建表//2.1表的元数据信息Schemaschema=newSchema().field("id",DataTypes.STRING()).field("ts",DataTypes.BIGINT()).field("vc",DataTypes.INT());//2.2连接文件,并创建一个临时表,其实就是一个动态表tableEnv.connect(newKafka().version("universal").topic("sensor").startFromLatest().property("group.id","bigdata").property("bootstrap.servers","hadoop162:9092,hadoop163:9092,hadoop164:9092")).withFormat(newJson()).withSchema(schema).createTemporaryTable("sensor");//3.对动态表进行查询TablesensorTable=tableEnv.from("sensor");TableresultTable=sensorTable.groupBy((“id”))
.select($(“id”), $(“id”).count().as(“cnt”));
// 4. 把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
resultStream.print();
通过Connector声明写出数据
File Sink
package com.atguigu.flink.java.chapter_11;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import static org.apache.flink.table.api.Expressions.$;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/11 21:43
*/
public class Flink02_TableApi_ToFileSystem {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource waterSensorStream =
env.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60));
// 1. 创建表的执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table sensorTable = tableEnv.fromDataStream(waterSensorStream); Table resultTable = sensorTable .where($("id").isEqual("sensor_1") ) .select($("id"), $("ts"), $("vc")); // 创建输出表 Schema schema = new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); tableEnv .connect(new FileSystem().path("output/sensor_id.txt")) .withFormat(new Csv().fieldDelimiter('|')) .withSchema(schema) .createTemporaryTable("sensor"); // 把数据写入到输出表中 resultTable.executeInsert("sensor");
}
}
Kafka Sink
package com.atguigu.flink.java.chapter_11;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import static org.apache.flink.table.api.Expressions.$;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/11 21:43
*/
public class Flink03_TableApi_ToKafka {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource waterSensorStream =
env.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60));
// 1. 创建表的执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table sensorTable = tableEnv.fromDataStream(waterSensorStream); Table resultTable = sensorTable .where($("id").isEqual("sensor_1") ) .select($("id"), $("ts"), $("vc")); // 创建输出表 Schema schema = new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); tableEnv .connect(new Kafka() .version("universal") .topic("sink_sensor") .sinkPartitionerRoundRobin() .property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092")) .withFormat(new Json()) .withSchema(schema) .createTemporaryTable("sensor"); // 把数据写入到输出表中 resultTable.executeInsert("sensor");
}
}
其他Connector用法
参考官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connect.html
Flink SQL
基本使用
查询未注册的表
package com.atguigu.flink.java.chapter_11;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/11 21:43
*/
public class Flink05_SQL_BaseUse {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource waterSensorStream =
env.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 使用sql查询未注册的表 Table inputTable = tableEnv.fromDataStream(waterSensorStream); Table resultTable = tableEnv.sqlQuery("select * from " + inputTable + " where id='sensor_1'"); tableEnv.toAppendStream(resultTable, Row.class).print(); env.execute();
}
}
查询已注册的表
package com.atguigu.flink.java.chapter_11;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/11 21:43
*/
public class Flink05_SQL_BaseUse_2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource waterSensorStream =
env.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 使用sql查询一个已注册的表 // 1. 从流得到一个表 Table inputTable = tableEnv.fromDataStream(waterSensorStream); // 2. 把注册为一个临时视图 tableEnv.createTemporaryView("sensor", inputTable); // 3. 在临时视图查询数据, 并得到一个新表 Table resultTable = tableEnv.sqlQuery("select * from sensor where id='sensor_1'"); // 4. 显示resultTable的数据 tableEnv.toAppendStream(resultTable, Row.class).print(); env.execute();
}
}
Kafka到Kafka
使用sql从Kafka读数据, 并写入到Kafka中
package com.atguigu.flink.java.chapter_11;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/11 21:43
*/
public class Flink05_SQL_Kafka2Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1. 注册SourceTable: source_sensor tableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with(" + "'connector' = 'kafka'," + "'topic' = 'topic_source_sensor'," + "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092'," + "'properties.group.id' = 'atguigu'," + "'scan.startup.mode' = 'latest-offset'," + "'format' = 'json'" + ")"); // 2. 注册SinkTable: sink_sensor tableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with(" + "'connector' = 'kafka'," + "'topic' = 'topic_sink_sensor'," + "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092'," + "'format' = 'json'" + ")"); // 3. 从SourceTable 查询数据, 并写入到 SinkTable tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");
}
}
时间属性
像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。
处理时间
DataStream 到 Table 转换时定义
处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource waterSensorStream =
env.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60));
// 1. 创建表的执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 声明一个额外的字段来作为处理时间字段
Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $(“id”), $(“ts”), $(“vc”), $(“pt”).proctime());
sensorTable.print();
env.execute();
在创建表的 DDL 中定义
package com.atguigu.flink.java.chapter_11;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/11 21:43
*/
public class Flink06_TableApi_ProcessTime {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 创建表的执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建表, 声明一个额外的列作为处理时间
tableEnv.executeSql(“create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with(”
+ “‘connector’ = ‘filesystem’,”
+ “‘path’ = ‘input/sensor.txt’,”
+ “‘format’ = ‘csv’”
+ “)”);TableResult result = tableEnv.executeSql("select * from sensor"); result.print();
}
}
事件时间
事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。
DataStream 到 Table 转换时定义
事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
在 schema 的结尾追加一个新的字段
替换一个已经存在的字段。
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
package com.atguigu.flink.java.chapter_11;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/11 21:43
*/
public class Flink07_TableApi_EventTime {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator waterSensorStream = env
.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table table = tableEnv // 用一个额外的字段作为事件时间属性 .fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("et").rowtime()); table.execute().print(); env.execute();
}
}
// 使用已有的字段作为时间属性
.fromDataStream(waterSensorStream, $(“id”), $(“ts”).rowtime(), $(“vc”));
在创建表的 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.
package com.atguigu.flink.java.chapter_11;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/11 21:43
*/
public class Flink07_TableApi_EventTime_2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t tEnv.executeSql("create table sensor(" + "id string," + "ts bigint," + "vc int, " + "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," + "watermark for t as t - interval '5' second)" + "with(" + "'connector' = 'filesystem'," + "'path' = 'input/sensor.txt'," + "'format' = 'csv'" + ")"); tEnv.sqlQuery("select * from sensor").execute().print();
}
}
说明:
把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。
窗口(window)
时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口然后根据时间段做计算了。
下面我们就来看看Table API和SQL中,怎么利用时间字段做窗口操作。
在Table API和SQL中,主要有两种窗口:Group Windows和Over Windows
Table API中使用窗口
Group Windows
分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。
Table API中的Group Windows都是使用.window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。
滚动窗口
public class Flink08_TableApi_Window_1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator waterSensorStream = env
.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv
.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
table
.window(Tumble.over(lit(10).second()).on($("ts")).as("w")) // 定义滚动窗口并给窗口起一个别名
.groupBy($("id"), $("w")) // 窗口必须出现的分组字段中
.select($("id"), $("w").start(), $("w").end(), $("vc").sum())
.execute()
.print();
env.execute();
}
}
滑动窗口
.window(Slide.over(lit(10).second()).every(lit(5).second()).on(
(
"
t
s
"
)
)
.
a
s
(
"
w
"
)
)
会
话
窗
口
.
w
i
n
d
o
w
(
S
e
s
s
i
o
n
.
w
i
t
h
G
a
p
(
l
i
t
(
6
)
.
s
e
c
o
n
d
(
)
)
.
o
n
(
("ts")).as("w")) 会话窗口 .window(Session.withGap(lit(6).second()).on(
("ts")).as("w"))会话窗口.window(Session.withGap(lit(6).second()).on((“ts”)).as(“w”))
Over Windows
Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。
Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
无界的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。
Unbounded Over Windows
public class Flink09_TableApi_OverWindow_1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator waterSensorStream = env
.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv
.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
table
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w"))
.select($("id"), $("ts"), $("vc").sum().over($("w")).as("sum_vc"))
.execute()
.print();
env.execute();
}
}
使用UNBOUNDED_RANGE
.window(Over.partitionBy(
(
"
i
d
"
)
)
.
o
r
d
e
r
B
y
(
("id")).orderBy(
("id")).orderBy((“ts”)).preceding(UNBOUNDED_RANGE).as(“w”))
说明:
Bounded Over Windows
// 当事件时间向前算3s得到一个窗口
.window(Over.partitionBy(
(
"
i
d
"
)
)
.
o
r
d
e
r
B
y
(
("id")).orderBy(
("id")).orderBy((“ts”)).preceding(lit(3).second()).as(“w”))
// 当行向前推算2行算一个窗口
.window(Over.partitionBy(
(
"
i
d
"
)
)
.
o
r
d
e
r
B
y
(
("id")).orderBy(
("id")).orderBy((“ts”)).preceding(rowInterval(2L)).as(“w”))
SQL API中使用窗口
Group Windows
SQL 查询的分组窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。以下是批处理表和流处理表支持的分组窗口函数:
分组窗口函数 描述
TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
HOP(time_attr, interval, interval) 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
SESSION(time_attr, interval) 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t
tEnv.executeSql(“create table sensor(” +
“id string,” +
“ts bigint,” +
"vc int, " +
“t as to_timestamp(from_unixtime(ts/1000,‘yyyy-MM-dd HH:mm:ss’)),” +
“watermark for t as t - interval ‘5’ second)” +
“with(”
+ “‘connector’ = ‘filesystem’,”
+ “‘path’ = ‘input/sensor.txt’,”
+ “‘format’ = ‘csv’”
+ “)”);
tEnv
.sqlQuery(
"SELECT id, " +
" TUMBLE_START(t, INTERVAL ‘1’ minute) as wStart, " +
" TUMBLE_END(t, INTERVAL ‘1’ minute) as wEnd, " +
" SUM(vc) sum_vc " +
"FROM sensor " +
“GROUP BY TUMBLE(t, INTERVAL ‘1’ minute), id”
)
.execute()
.print();
tEnv
.sqlQuery(
"SELECT id, " +
" hop_start(t, INTERVAL ‘1’ minute, INTERVAL ‘1’ hour) as wStart, " +
" hop_end(t, INTERVAL ‘1’ minute, INTERVAL ‘1’ hour) as wEnd, " +
" SUM(vc) sum_vc " +
"FROM sensor " +
“GROUP BY hop(t, INTERVAL ‘1’ minute, INTERVAL ‘1’ hour), id”
)
.execute()
.print();
Over Windows
tEnv
.sqlQuery(
"select " +
“id,” +
“vc,” +
“sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row)” +
“from sensor”
)
.execute()
.print();
tEnv
.sqlQuery(
"select " +
“id,” +
“vc,” +
"count(vc) over w, " +
"sum(vc) over w " +
"from sensor " +
“window w as (partition by id order by t rows between 1 PRECEDING and current row)”
)
.execute()
.print();
Catalog
Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。
前面用到Connector其实就是在使用Catalog
Catalog类型
GenericInMemoryCatalog
GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。
JdbcCatalog
JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。
HiveCatalog
HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。 Flink 的 Hive 文档 提供了有关设置 HiveCatalog 以及访问现有 Hive 元数据的详细信息。
HiveCatalog
导入需要的依赖
org.apache.flink
flink-connector-hive_
s
c
a
l
a
.
b
i
n
a
r
y
.
v
e
r
s
i
o
n
<
/
a
r
t
i
f
a
c
t
I
d
>
<
v
e
r
s
i
o
n
>
{scala.binary.version}</artifactId> <version>
scala.binary.version</artifactId><version>{flink.version}
String name = “myhive”; // Catalog 名字
String defaultDatabase = “flink_test”; // 默认数据库
String hiveConfDir = “c:/conf”; // hive配置文件的目录. 需要把hive-site.xml添加到该目录
// 1. 创建HiveCatalog
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
// 2. 注册HiveCatalog
tEnv.registerCatalog(name, hive);
// 3. 把 HiveCatalog: myhive 作为当前session的catalog
tEnv.useCatalog(name);
tEnv.useDatabase(“flink_test”);
tEnv.sqlQuery(“select * from stu”).execute().print();
函数(function)
Flink 允许用户在 Table API 和 SQL 中使用函数进行数据的转换。
内置函数
Flink Table API和SQL给用户提供了大量的函数用于数据转换.
Scalar Functions(标量函数)
输入: 0个1个或多个
输出: 1个
Comparison Functions(比较函数)
Logical Functions(逻辑函数)
Aggregate Functions(聚合函数)
其他所有内置函数
参考官网: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
自定义函数
自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。
自定义函数分类:
标量函数(Scalar functions) 将标量值转换成一个新标量值;
表值函数(Table functions) 将标量值转换成新的行数据;
聚合函数(Aggregate functions) 将多行数据里的标量值转换成一个新标量值;
表值聚合函数(Table aggregate) 将多行数据里的标量值转换成新的行数据;
异步表值函数(Async table functions) 是异步查询外部数据系统的特殊函数。
函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 内联 后直接使用。
标量函数
介绍
用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值。
为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval(直接def声明,没有override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。
定义函数
// 定义一个可以把字符串转成大写标量函数
public static class ToUpperCase extends ScalarFunction {
public String eval(String s){
return s.toUpperCase();
}
}
在TableAPI中使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource stream = env.fromElements(“hello”, “atguigu”, “Hello”);
Table table = tEnv.fromDataStream(stream, $(“word”));
// 1. table api 使用方式1: 不注册直接 inline 使用
table.select(call(ToUpperCase.class, $(“word”)).as(“word_upper”)).execute().print();
// 2. table api 使用方式2: 注册后使用
// 2.1 注册函数
tEnv.createTemporaryFunction(“toUpper”, ToUpperCase.class);
// 2.2 使用函数
table.select(call(“toUpper”, $(“word”)).as(“word_upper”)).execute().print();
在SQL中使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource stream = env.fromElements(“hello”, “atguigu”, “Hello”);
Table table = tEnv.fromDataStream(stream, $(“word”));
// 1. 注册临时函数
tEnv.createTemporaryFunction(“toUpper”, ToUpperCase.class);
// 2. 注册临时表
tEnv.createTemporaryView(“t_word”, table);
// 3. 在临时表上使用自定义函数查询
tEnv.sqlQuery(“select toUpper(word) word_upper from t_word”).execute().print();
表值函数
介绍
跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果输出行只包含 1 列,会省略结构化信息并生成标量值,这个标量值在运行阶段会隐式地包装进行里。
要定义一个表值函数,你需要扩展 org.apache.flink.table.functions 下的 TableFunction,可以通过实现多个名为 eval 的方法对求值方法进行重载。像其他函数一样,输入和输出类型也可以通过反射自动提取出来。表值函数返回的表的类型取决于 TableFunction 类的泛型参数 T,不同于标量函数,表值函数的求值方法本身不包含返回类型,而是通过 collect(T) 方法来发送要输出的行。
在 Table API 中,表值函数是通过 .joinLateral(…) 或者 .leftOuterJoinLateral(…) 来使用的。joinLateral 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。leftOuterJoinLateral 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。
在 SQL 里面用 JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 LATERAL TABLE() 的使用。
其实就是以前的UDTF函数
定义函数
@FunctionHint(output = @DataTypeHint(“ROW(word string, len int)”))
public static class Split extends TableFunction {
public void eval(String line) {
if (line.length() == 0) {
return;
}
for (String s : line.split(“,”)) {
// 来一个字符串, 按照逗号分割, 得到多行, 每行为这个单词和他的长度
collect(Row.of(s, s.length()));
}
}
}
在TableAPI中使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource stream = env.fromElements(“hello,atguigu,world”, “aaa,bbbbb”, “”);
Table table = tEnv.fromDataStream(stream, $(“line”));
// 1. 内联使用
table
.joinLateral(call(Split.class,
(
"
l
i
n
e
"
)
)
)
.
s
e
l
e
c
t
(
("line"))) .select(
("line"))).select((“line”), $(“word”), $(“len”))
.execute()
.print();
table
.leftOuterJoinLateral(call(Split.class,
(
"
l
i
n
e
"
)
)
)
.
s
e
l
e
c
t
(
("line"))) .select(
("line"))).select((“line”), $(“word”), $(“len”))
.execute()
.print();
// 2. 注册后使用
tEnv.createTemporaryFunction(“split”, Split.class);
table
.joinLateral(call(“split”,
(
"
l
i
n
e
"
)
)
)
.
s
e
l
e
c
t
(
("line"))) .select(
("line"))).select((“line”), $(“word”), $(“len”))
.execute()
.print();
在SQL中使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource stream = env.fromElements(“hello,atguigu,world”, “aaa,bbbbb”, “”);
Table table = tEnv.fromDataStream(stream, $(“line”));
// 1. 注册表
tEnv.createTemporaryView(“t_word”, table);
// 2. 注册函数
tEnv.createTemporaryFunction(“split”, Split.class);
// 3. 使用函数
// 3.1 join
tEnv.sqlQuery("select " +
" line, word, len " +
"from t_word " +
“join lateral table(split(line)) on true”).execute().print();
// 或者
tEnv.sqlQuery("select " +
" line, word, len " +
"from t_word, " +
“lateral table(split(line))”).execute().print();
// 3.2 left join
tEnv.sqlQuery("select " +
" line, word, len " +
"from t_word " +
“left join lateral table(split(line)) on true”).execute().print();
// 3.3 join或者left join给字段重命名
tEnv.sqlQuery("select " +
" line, new_word, new_len " +
"from t_word " +
“left join lateral table(split(line)) as T(new_word, new_len) on true”).execute().print();
聚合函数
介绍
用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的。
上图中显示了一个聚合的例子。
假设现在有一张表,包含了各种饮料的数据。该表由三列(id、name和price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格,即执行max()聚合,结果将是一个数值。
AggregateFunction的工作原理如下。
首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用AggregateFunction的createAccumulator()方法创建空累加器。
随后,对每个输入行调用函数的accumulate()方法来更新累加器。
处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果。
AggregationFunction要求必须实现的方法:
createAccumulator()
accumulate()
getValue()
除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口(session group window)的上下文中,则merge()方法是必需的。
retract() 在 bounded OVER 窗口中是必须实现的。
merge()
resetAccumulator() 在许多批式聚合中是必须实现的。
定义函数
定义一个计算sensor平均温度的函数
// 累加器类型
public static class VcAvgAcc {
public Integer sum = 0;
public Long count = 0L;
}
public static class VcAvg extends AggregateFunction<Double, VcAvgAcc> {
// 返回最终的计算结果
@Override
public Double getValue(VcAvgAcc accumulator) {
return accumulator.sum * 1.0 / accumulator.count;
}
// 初始化累加器
@Override
public VcAvgAcc createAccumulator() {
return new VcAvgAcc();
}
// 处理输入的值, 更新累加器
// 参数1: 累加器
// 参数2,3,...: 用户自定义的输入值
public void accumulate(VcAvgAcc acc, Integer vc) {
acc.sum += vc;
acc.count += 1L;
}
}
在Table API中使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource waterSensorStream =
env.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60));
Table table = tEnv.fromDataStream(waterSensorStream);
// 1. 内联使用
table
.groupBy(
(
"
i
d
"
)
)
.
s
e
l
e
c
t
(
("id")) .select(
("id")).select((“id”), call(VcAvg.class, $(“vc”)))
.execute()
.print();
// 2. 注册后使用
tEnv.createTemporaryFunction(“my_avg”, VcAvg.class);
table
.groupBy(
(
"
i
d
"
)
)
.
s
e
l
e
c
t
(
("id")) .select(
("id")).select((“id”), call(“my_avg”, $(“vc”)))
.execute()
.print();
在SQL中使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource waterSensorStream =
env.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60));
Table table = tEnv.fromDataStream(waterSensorStream);
// 在sql中使用
// 1. 注册表
tEnv.createTemporaryView(“t_sensor”, table);
// 2. 注册函数
tEnv.createTemporaryFunction(“my_avg”, VcAvg.class);
// 3. sql中使用自定义聚合函数
tEnv.sqlQuery(“select id, my_avg(vc) from t_sensor group by id”).execute().print();
表值聚合函数
介绍
自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表,结果中可以有多行多列。
比如现在我们需要找到表中所有饮料的前2个最高价格,即执行top2()表聚合。我们需要检查5行中的每一行,得到的结果将是一个具有排序后前2个值的表。
用户定义的表聚合函数,是通过继承TableAggregateFunction抽象类来实现的。
TableAggregateFunction的工作原理如下。
首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用TableAggregateFunction的createAccumulator()方法可以创建空累加器。
随后,对每个输入行调用函数的accumulate()方法来更新累加器。
处理完所有行后,将调用函数的emitValue()方法来计算并返回最终结果。
AggregationFunction要求必须实现的方法:
createAccumulator()
accumulate()
除了上述方法之外,还有一些可选择实现的方法。
retract()
merge()
resetAccumulator()
emitValue()
emitUpdateWithRetract()
定义函数
接下来我们写一个自定义TableAggregateFunction,用来提取每个sensor最高的两个温度值。
// 累加器
public static class Top2Acc {
public Integer first = Integer.MIN_VALUE; // top 1
public Integer second = Integer.MIN_VALUE; // top 2
}
// Tuple2<Integer, Integer> 值和排序
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Acc> {
@Override
public Top2Acc createAccumulator() {
return new Top2Acc();
}
public void accumulate(Top2Acc acc, Integer vc) {
if (vc > acc.first) {
acc.second = acc.first;
acc.first = vc;
} else if (vc > acc.second) {
acc.second = vc;
}
}
public void emitValue(Top2Acc acc, Collector<Tuple2<Integer, Integer>> out) {
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
在Table API中使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource waterSensorStream =
env.fromElements(new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60));
Table table = tEnv.fromDataStream(waterSensorStream);
// 1. 内联使用
table
.groupBy($(“id”))
.flatAggregate(call(Top2.class,
(
"
v
c
"
)
)
.
a
s
(
"
v
"
,
"
r
a
n
k
"
)
)
.
s
e
l
e
c
t
(
("vc")).as("v", "rank")) .select(
("vc")).as("v","rank")).select((“id”), $(“v”),
(
"
r
a
n
k
"
)
)
.
e
x
e
c
u
t
e
(
)
.
p
r
i
n
t
(
)
;
/
/
2.
注
册
后
使
用
t
E
n
v
.
c
r
e
a
t
e
T
e
m
p
o
r
a
r
y
F
u
n
c
t
i
o
n
(
"
t
o
p
2
"
,
T
o
p
2.
c
l
a
s
s
)
;
t
a
b
l
e
.
g
r
o
u
p
B
y
(
("rank")) .execute() .print(); // 2. 注册后使用 tEnv.createTemporaryFunction("top2", Top2.class); table .groupBy(
("rank")).execute().print();//2.注册后使用tEnv.createTemporaryFunction("top2",Top2.class);table.groupBy((“id”))
.flatAggregate(call(“top2”,
(
"
v
c
"
)
)
.
a
s
(
"
v
"
,
"
r
a
n
k
"
)
)
.
s
e
l
e
c
t
(
("vc")).as("v", "rank")) .select(
("vc")).as("v","rank")).select((“id”), $(“v”), $(“rank”))
.execute()
.print();
在SQL中使用
目前还不支持
SqlClient
启动换一个yarn-session, 然后启动一个sql客户端.
bin/sql-client.sh embedded
建立到Kafka的连接
下面创建一个流表从Kafka读取数据
copy 依赖到 flink的lib 目录下 flink-sql-connector-kafka_2.11-1.12.0.jar 下载地址: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.12.0/flink-sql-connector-kafka_2.11-1.12.0.jar
create table sensor(id string, ts bigint, vc int)
with(
‘connector’=‘kafka’,
‘topic’=‘flink_sensor’,
‘properties.bootstrap.servers’=‘hadoop162:9092’,
‘properties.group.id’=‘atguigu’,
‘format’=‘json’,
‘scan.startup.mode’=‘latest-offset’
)
从流表查询数据
select * from sensor;
向Kafka写入数据: {“id”: “sensor1”, “ts”: 1000, “vc”: 10}
建立到mysql的连接
依赖: flink-connector-jdbc_2.11-1.12.0.jar
下载地址: https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.12.0/flink-connector-jdbc_2.11-1.12.0.jar
copy mysql驱动到lib目录
create table sensor(id string, ts bigint, vc int)
with(
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://hadoop162:3306/gmall’,
‘username’=‘root’,
‘password’=‘aaaaaa’,
‘table-name’ = ‘sensor’
)
Flink SQL编程实战
使用SQL实现热门商品TOP N
目前仅 Blink 计划器支持 Top-N 。
Flink 使用 OVER 窗口条件和过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能,Flink 还支持逐组 Top-N 。 例如,每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于SQL的 Top-N 查询。
流处理模式需注意: TopN 查询的结果会带有更新。 Flink SQL 会根据排序键对输入的流进行排序;若 top N 的记录发生了变化,变化的部分会以撤销、更新记录的形式发送到下游。 推荐使用一个支持更新的存储作为 Top-N 查询的 sink 。另外,若 top N 记录需要存储到外部存储,则结果表需要拥有与 Top-N 查询相同的唯一键。
需求描述
每隔10min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中
思路:
统计每个商品的点击量, 开窗
分组窗口分组,
over窗口
数据源
input/UserBehavior.csv
在mysql中创建表
CREATE DATABASE flink_sql;
USE flink_sql;
DROP TABLE IF EXISTS hot_item
;
CREATE TABLE hot_item
(
w_end
timestamp NOT NULL,
item_id
bigint(20) NOT NULL,
item_count
bigint(20) NOT NULL,
rk
bigint(20) NOT NULL,
PRIMARY KEY (w_end
,rk
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
导入JDBC Connector依赖
org.apache.flink
flink-connector-jdbc_
s
c
a
l
a
.
b
i
n
a
r
y
.
v
e
r
s
i
o
n
<
/
a
r
t
i
f
a
c
t
I
d
>
<
v
e
r
s
i
o
n
>
{scala.binary.version}</artifactId> <version>
scala.binary.version</artifactId><version>{flink.version}
具体实现代码
package com.atguigu.flink.java.chapter_12;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/31 9:11
*/
public class Flink01_HotItem_TopN {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 使用sql从文件读取数据 tenv.executeSql( "create table user_behavior(" + " user_id bigint, " + " item_id bigint, " + " category_id int, " + " behavior string, " + " ts bigint, " + " event_time as to_timestamp(from_unixtime(ts, 'yyyy-MM-dd HH:mm:ss')), " + " watermark for event_time as event_time - interval '5' second " + ")with(" + " 'connector'='filesystem', " + " 'path'='input/UserBehavior.csv', " + " 'format'='csv')" ); // 每隔 10m 统计一次最近 1h 的热门商品 top // 1. 计算每每个窗口内每个商品的点击量 Table t1 = tenv .sqlQuery( "select " + " item_id, " + " hop_end(event_time, interval '10' minute, interval '1' hour) w_end," + " count(*) item_count " + "from user_behavior " + "where behavior='pv' " + "group by hop(event_time, interval '10' minute, interval '1' hour), item_id" ); tenv.createTemporaryView("t1", t1); // 2. 按照窗口开窗, 对商品点击量进行排名 Table t2 = tenv.sqlQuery( "select " + " *," + " row_number() over(partition by w_end order by item_count desc) rk " + "from t1" ); tenv.createTemporaryView("t2", t2); // 3. 取 top3 Table t3 = tenv.sqlQuery( "select " + " item_id, w_end, item_count, rk " + "from t2 " + "where rk<=3" ); // 4. 数据写入到mysql // 4.1 创建输出表 tenv.executeSql("create table hot_item(" + " item_id bigint, " + " w_end timestamp(3), " + " item_count bigint, " + " rk bigint, " + " PRIMARY KEY (w_end, rk) NOT ENFORCED)" + "with(" + " 'connector' = 'jdbc', " + " 'url' = 'jdbc:mysql://hadoop162:3306/flink_sql?useSSL=false', " + " 'table-name' = 'hot_item', " + " 'username' = 'root', " + " 'password' = 'aaaaaa' " + ")"); // 4.2 写入到输出表 t3.executeInsert("hot_item");
}
}
一些补充知识
双流join
在Flink中, 支持两种方式的流的Join: Window Join和Interval Join
Window Join
窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素.
注意:
所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会处理(就是忽略掉了)
join成功后的元素的会以所在窗口的最大时间作为其时间戳. 例如窗口[5,10), 则元素会以9作为自己的时间戳
滚动窗口Join
package com.atguigu.flink.java.chapter_12;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/24 22:09
*/
public class Flink01_Join_Window_Tumbling {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> s1 = env .socketTextStream("hadoop162", 8888) // 在socket终端只输入毫秒级别的时间戳 .map(value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }) ); SingleOutputStreamOperator<WaterSensor> s2 = env .socketTextStream("hadoop162", 9999) // 在socket终端只输入毫秒级别的时间戳 .map(value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }) ); s1.join(s2) .where(WaterSensor::getId) .equalTo(WaterSensor::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 必须使用窗口 .apply(new JoinFunction<WaterSensor, WaterSensor, String>() { @Override public String join(WaterSensor first, WaterSensor second) throws Exception { return "first: " + first + ", second: " + second; } }) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
}
}
滑动窗口Join
会话窗口Join
Interval Join
间隔流join(Interval Join), 是指使用一个流的数据按照key去join另外一条流的指定范围的数据.
如下图: 橙色的流去join绿色的流.范围是由橙色流的event-time + lower bound和event-time + upper bound来决定的.
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
注意:
Interval Join只支持event-time
必须是keyBy之后的流才可以interval join
package com.atguigu.flink.java.chapter_12;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/24 23:11
*/
public class Flink01_Join_Interval {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> s1 = env .socketTextStream("hadoop162", 8888) // 在socket终端只输入毫秒级别的时间戳 .map(value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }) ); SingleOutputStreamOperator<WaterSensor> s2 = env .socketTextStream("hadoop162", 9999) // 在socket终端只输入毫秒级别的时间戳 .map(value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }) ); s1 .keyBy(WaterSensor::getId) .intervalJoin(s2.keyBy(WaterSensor::getId)) // 指定上下界 .between(Time.seconds(-2), Time.seconds(3)) .process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>() { @Override public void processElement(WaterSensor left, WaterSensor right, Context ctx, Collector<String> out) throws Exception { out.collect(left + "," + right); } }) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
}
}海量数据实时去重
方案1: 借助redis的Set
具体实现代码
略
缺点
需要频繁连接Redis
如果数据量过大, 对redis的内存也是一种压力
方案2: 使用Flink的MapState
具体实现代码
参考:案例5:MapState
缺点
如果数据量过大, 状态后端最好选择 RocksDBStateBackend
如果数据量过大, 对存储也有一定压力
方案3: 使用布隆过滤器
布隆过滤器可以大大减少存储的数据的数据量
布隆过滤器
为什么需要布隆过滤器
如果想判断一个元素是不是在一个集合里,一般想到的是将集合中所有元素保存起来,然后通过比较确定。链表、树、散列表(又叫哈希表,Hash table)等等数据结构都是这种思路。
但是随着集合中元素的增加,我们需要的存储空间越来越大。同时检索速度也越来越慢,上述三种结构的检索时间复杂度分别为O(n),O(logn),O(1)。
布隆过滤器即可以解决存储空间的问题, 又可以解决时间复杂度的问题.
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。
基本概念
布隆过滤器(Bloom Filter,下文简称BF)由Burton Howard Bloom在1970年提出,是一种空间效率高的概率型数据结构。它专门用来检测集合中是否存在特定的元素。
它实际上是一个很长的二进制向量和一系列随机映射函数。
实现原理
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。
BF是由一个长度为m比特的位数组(bit array)与k个哈希函数(hash function)组成的数据结构。位数组均初始化为0,所有哈希函数都可以分别把输入数据尽量均匀地散列。
当要插入一个元素时,将其数据分别输入k个哈希函数,产生k个哈希值。以哈希值作为位数组中的下标,将所有k个对应的比特置为1。
当要查询(即判断是否存在)一个元素时,同样将其数据输入哈希函数,然后检查对应的k个比特。如果有任意一个比特为0,表明该元素一定不在集合中。如果所有比特均为1,表明该集合有(较大的)可能性在集合中。为什么不是一定在集合中呢?因为一个比特被置为1有可能会受到其他元素的影响(hash碰撞),这就是所谓“假阳性”(false positive)。相对地,“假阴性”(false negative)在BF中是绝不会出现的。
下图示出一个m=18, k=3的BF示例。集合中的x、y、z三个元素通过3个不同的哈希函数散列到位数组中。当查询元素w时,因为有一个比特为0,因此w不在该集合中。
优点
不需要存储数据本身,只用比特表示,因此空间占用相对于传统方式有巨大的优势,并且能够保密数据;
时间效率也较高,插入和查询的时间复杂度均为O(k), 所以他的时间复杂度实际是O(1)
哈希函数之间相互独立,可以在硬件指令层面并行计算。
缺点
存在假阳性的概率,不适用于任何要求100%准确率的情境;
只能插入和查询元素,不能删除元素,这与产生假阳性的原因是相同的。我们可以简单地想到通过计数(即将一个比特扩展为计数值)来记录元素数,但仍然无法保证删除的元素一定在集合中。
使用场景
所以,BF在对查准度要求没有那么苛刻,而对时间、空间效率要求较高的场合非常合适.
另外,由于它不存在假阴性问题,所以用作“不存在”逻辑的处理时有奇效,比如可以用来作为缓存系统(如Redis)的缓冲,防止缓存穿透。
假阳性概率的计算
假阳性的概率其实就是一个不在的元素,被k个函数函数散列到的k个位置全部都是1的概率。可以按照如下的步骤进行计算: p = f(m,n,k)
其中各个字母的含义:
n :放入BF中的元素的总个数;
m:BF的总长度,也就是bit数组的个数
k:哈希函数的个数;
p:表示BF将一个不在其中的元素错判为在其中的概率,也就是false positive的概率;
BF中的任何一个bit在第一个元素的第一个hash函数执行完之后为 0的概率是:
1-1/m
BF中的任何一个bit在第一个元素的k个hash函数执行完之后为 0的概率是:
〖(1-1/m)〗^k
BF中的任何一个bit在所有的n元素都添加完之后为 0的概率是:
〖(1-1/m)〗^kn
BF中的任何一个bit在所有的n元素都添加完之后为 1的概率是:
〖1-(1-1/m)〗^kn
一个不存在的元素被k个hash函数映射后k个bit都是1的概率是:
p=〖[〖1-(1-1/m)〗kn]〗k≈(1-e^((-kn)/m) )k=(1-1/e(kn/m) )^k
结论:在哈数函数个数k一定的情况下
比特数组m长度越大, p越小, 表示假阳性率越低
已插入的元素个数n越大, p越大, 表示假阳性率越大
经过各种数学推导:
对于给定的m和n,使得假阳性率(误判率)最小的k通过如下公式定义:
k=m/n ln2≈0.7 m/n
使用布隆过滤器实现去重
Flink已经内置了布隆过滤器的实现(使用的是google的Guava)
package com.atguigu.flink.java.chapter_13;
import com.atguigu.flink.java.chapter_6.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
-
@Author lizhenchao@atguigu.cn
-
@Date 2021/1/4 15:27
*/
public class Flink02_UV_BoomFilter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建WatermarkStrategy WatermarkStrategy<UserBehavior> wms = WatermarkStrategy .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() { @Override public long extractTimestamp(UserBehavior element, long recordTimestamp) { return element.getTimestamp() * 1000L; } }); env .readTextFile("input/UserBehavior.csv") .map(line -> { // 对数据切割, 然后封装到POJO中 String[] split = line.split(","); return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4])); }) .filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为 .assignTimestampsAndWatermarks(wms) .keyBy(UserBehavior::getBehavior) .window(TumblingEventTimeWindows.of(Time.minutes(60))) .process(new ProcessWindowFunction<UserBehavior, String, String, TimeWindow>() { private ValueState<Long> countState; private ValueState<BloomFilter<Long>> bfState; @Override public void open(Configuration parameters) throws Exception { countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class)); bfState = getRuntimeContext() .getState(new ValueStateDescriptor<BloomFilter<Long>>("bfState", TypeInformation.of(new TypeHint<BloomFilter<Long>>() {}) ) ); } @Override public void process(String key, Context context, Iterable<UserBehavior> elements, Collector<String> out) throws Exception { countState.update(0L); // 在状态中初始化一个布隆过滤器 // 参数1: 漏斗, 存储的类型 // 参数2: 期望插入的元素总个数 // 参数3: 期望的误判率(假阳性率) BloomFilter<Long> bf = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.001); bfState.update(bf); for (UserBehavior behavior : elements) { // 查布隆 if (!bfState.value().mightContain(behavior.getUserId())) { // 不存在 计数+1 countState.update(countState.value() + 1L); // 记录这个用户di, 表示来过 bfState.value().put(behavior.getUserId()); } } out.collect("窗口: " + context.window() + " 的uv是: " + countState.value()); } }) .print(); env.execute();
}
}