第09讲:Flink 状态与容错
这一课时我们主要讲解 Flink 的状态和容错。
在 Flink 的框架中,进行有状态的计算是 Flink 最重要的特性之一。所谓的状态,其实指的是 Flink 程序的中间计算结果。Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器。
状态
我们在 Flink 的官方博客中找到这样一段话,可以认为这是对状态的定义:
这段话告诉我们,所谓的状态指的是,在流处理过程中那些需要记住的数据,而这些数据既可以包括业务数据,也可以包括元数据。Flink 本身提供了不同的状态管理器来管理状态,并且这个状态可以非常大。
Flink 的状态数据可以存在 JVM 的堆内存或者堆外内存中,当然也可以借助第三方存储,例如 Flink 已经实现的对 RocksDB 支持。Flink 的官网同样给出了适用于状态计算的几种情况:
- When an application searches for certain event patterns, the state will store the sequence of events encountered so far
- When aggregating events per minute/hour/day, the state holds the pending aggregates
- When training a machine learning model over a stream of data points, the state holds the current version of the model parameters
- When historic data needs to be managed, the state allows efficient access to events that occurred in the past
以上四种情况分别是:复杂事件处理获取符合某一特定时间规则的事件、聚合计算、机器学习的模型训练、使用历史的数据进行计算。
Flink 状态分类和使用
我们在之前的课时中提到过 KeyedStream 的概念,并且介绍过 KeyBy 这个算子的使用。在 Flink 中,根据数据集是否按照某一个 Key 进行分区,将状态分为 Keyed State 和 Operator State(Non-Keyed State)两种类型。
如上图所示,Keyed State 是经过分区后的流上状态,每个 Key 都有自己的状态,图中的八边形、圆形和三角形分别管理各自的状态,并且只有指定的 key 才能访问和更新自己对应的状态。
与 Keyed State 不同的是,Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。每个算子子任务上的数据共享自己的状态。
但是有一点需要说明的是,无论是 Keyed State 还是 Operator State,Flink 的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。
我们可以看一下 State 的类图,对于 Keyed State,Flink 提供了几种现成的数据结构供我们使用,State 主要有四种实现,分别为 ValueState、MapState、AppendingState 和 ReadOnlyBrodcastState ,其中 AppendingState 又可以细分为 ReducingState、AggregatingState 和 ListState。
那么我们怎么访问这些状态呢?Flink 提供了 StateDesciptor 方法专门用来访问不同的 state,类图如下:
下面演示一下如何使用 StateDesciptor 和 ValueState,代码如下:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 5L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.printToErr();
env.execute(<span class="hljs-string">"submit job"</span>);
}
public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
<span class="hljs-keyword">private</span> <span class="hljs-keyword">transient</span> ValueState<Tuple2<Long, Long>> sum;
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">flatMap</span><span class="hljs-params">(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out)</span> <span class="hljs-keyword">throws</span> Exception </span>{
Tuple2<Long, Long> currentSum;
<span class="hljs-comment">// 访问ValueState</span>
<span class="hljs-keyword">if</span>(sum.value()==<span class="hljs-keyword">null</span>){
currentSum = Tuple2.of(<span class="hljs-number">0L</span>, <span class="hljs-number">0L</span>);
}<span class="hljs-keyword">else</span> {
currentSum = sum.value();
}
<span class="hljs-comment">// 更新</span>
currentSum.f0 += <span class="hljs-number">1</span>;
<span class="hljs-comment">// 第二个元素加1</span>
currentSum.f1 += input.f1;
<span class="hljs-comment">// 更新state</span>
sum.update(currentSum);
<span class="hljs-comment">// 如果count的值大于等于2,求知道并清空state</span>
<span class="hljs-keyword">if</span> (currentSum.f0 >= <span class="hljs-number">2</span>) {
out.collect(<span class="hljs-keyword">new</span> Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
“average”, // state的名字
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
); // 设置默认值
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(<span class="hljs-number">10</span>))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
descriptor.enableTimeToLive(ttlConfig);
sum = getRuntimeContext().getState(descriptor);
}
}
在上述例子中,我们通过继承 RichFlatMapFunction 来访问 State,通过 getRuntimeContext().getState(descriptor) 来获取状态的句柄。而真正的访问和更新状态则在 Map 函数中实现。
我们这里的输出条件为,每当第一个元素的和达到二,就把第二个元素的和与第一个元素的和相除,最后输出。我们直接运行,在控制台可以看到结果:
Operator State 的实际应用场景不如 Keyed State 多,一般来说它会被用在 Source 或 Sink 等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证 Flink 应用的 Exactly-Once 语义。
同样,我们对于任何状态数据还可以设置它们的过期时间。如果一个状态设置了 TTL,并且已经过期,那么我们之前保存的值就会被清理。
想要使用 TTL,我们需要首先构建一个 StateTtlConfig 配置对象;然后,可以通过传递配置在任何状态描述符中启用 TTL 功能。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
descriptor.enableTimeToLive(ttlConfig);
StateTtlConfig 这个类中有一些配置需要我们注意:
UpdateType 表明了过期时间什么时候更新,而对于那些过期的状态,是否还能被访问则取决于 StateVisibility 的配置。
状态后端种类和配置
我们在上面的内容中讲到了 Flink 的状态数据可以存在 JVM 的堆内存或者堆外内存中,当然也可以借助第三方存储。默认情况下,Flink 的状态会保存在 taskmanager 的内存中,Flink 提供了三种可用的状态后端用于在不同情况下进行状态后端的保存。
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
MemoryStateBackend
顾名思义,MemoryStateBackend 将 state 数据存储在内存中,一般用来进行本地调试用,我们在使用 MemoryStateBackend 时需要注意的一些点包括:
MemoryStateBackend 可以通过在代码中显示指定:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend(DEFAULT_MAX_STATE_SIZE,false));
其中,new MemoryStateBackend(DEFAULT_MAX_STATE_SIZE,false) 中的 false 代表关闭异步快照机制。关于快照,我们在后面的课时中有单独介绍。
很明显 MemoryStateBackend 适用于我们本地调试使用,来记录一些状态很小的 Job 状态信息。
FsStateBackend
FsStateBackend 会把状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中,少量的元数据信息存储到 JobManager 的内存中。
使用 FsStateBackend 需要我们指定一个文件路径,一般来说是 HDFS 的路径,例如,hdfs://namenode:40010/flink/checkpoints。
我们同样可以在代码中显示指定:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints", false));
FsStateBackend 因为将状态存储在了外部系统如 HDFS 中,所以它适用于大作业、状态较大、全局高可用的那些任务。
RocksDBStateBackend
RocksDBStateBackend 和 FsStateBackend 有一些类似,首先它们都需要一个外部文件存储路径,比如 HDFS 的 hdfs://namenode:40010/flink/checkpoints,此外也适用于大作业、状态较大、全局高可用的那些任务。
但是与 FsStateBackend 不同的是,RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 运行节点的数据目录下。
这意味着,RocksDBStateBackend 可以存储远超过 FsStateBackend 的状态,可以避免向 FsStateBackend 那样一旦出现状态暴增会导致 OOM,但是因为将状态数据保存在 RocksDB 数据库中,吞吐量会有所下降。
此外,需要注意的是,RocksDBStateBackend 是唯一支持增量快照的状态后端。
总结
我们在这一课时中讲解了 Flink 中的状态分类和使用,并且用实际案例演示了用法;此外介绍了 Flink 状态的保存方式和优缺点。
点击这里下载本课程源码
第10讲:Flink Side OutPut 分流
这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。
分流场景
我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?
分流的方法
通常来说针对不同的场景,有以下三种办法进行流的拆分。
Filter 分流
Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。
来看下面的例子:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
<span class="hljs-comment">//获取数据源</span>
List data = <span class="hljs-keyword">new</span> ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">1</span>,<span class="hljs-number">0</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">1</span>,<span class="hljs-number">1</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">2</span>,<span class="hljs-number">2</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">1</span>,<span class="hljs-number">3</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">5</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">9</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">11</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">13</span>));
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == <span class="hljs-number">0</span>);
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == <span class="hljs-number">1</span>);
zeroStream.print();
oneStream.printToErr();
<span class="hljs-comment">//打印结果</span>
String jobName = <span class="hljs-string">"user defined streaming source"</span>;
env.execute(jobName);
}
在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。
可以看到 zeroStream 和 oneStream 分别被打印出来。
Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。
Split 分流
Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。
我们来看下面的例子:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
<span class="hljs-comment">//获取数据源</span>
List data = <span class="hljs-keyword">new</span> ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">1</span>,<span class="hljs-number">0</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">1</span>,<span class="hljs-number">1</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">2</span>,<span class="hljs-number">2</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">1</span>,<span class="hljs-number">3</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">5</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">9</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">11</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">13</span>));
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(<span class="hljs-keyword">new</span> OutputSelector<Tuple3<Integer, Integer, Integer>>() {
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> Iterable<String> <span class="hljs-title">select</span><span class="hljs-params">(Tuple3<Integer, Integer, Integer> value)</span> </span>{
List<String> tags = <span class="hljs-keyword">new</span> ArrayList<>();
<span class="hljs-keyword">if</span> (value.f0 == <span class="hljs-number">0</span>) {
tags.add(<span class="hljs-string">"zeroStream"</span>);
} <span class="hljs-keyword">else</span> <span class="hljs-keyword">if</span> (value.f0 == <span class="hljs-number">1</span>) {
tags.add(<span class="hljs-string">"oneStream"</span>);
}
<span class="hljs-keyword">return</span> tags;
}
});
splitStream.select(<span class="hljs-string">"zeroStream"</span>).print();
splitStream.select(<span class="hljs-string">"oneStream"</span>).printToErr();
<span class="hljs-comment">//打印结果</span>
String jobName = <span class="hljs-string">"user defined streaming source"</span>;
env.execute(jobName);
}
同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。
但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。
Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。
SideOutPut 分流
SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:
- 定义 OutputTag
- 调用特定函数进行数据拆分
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
<span class="hljs-comment">//获取数据源</span>
List data = <span class="hljs-keyword">new</span> ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">1</span>,<span class="hljs-number">0</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">1</span>,<span class="hljs-number">1</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">2</span>,<span class="hljs-number">2</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">0</span>,<span class="hljs-number">1</span>,<span class="hljs-number">3</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">5</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">9</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">11</span>));
data.add(<span class="hljs-keyword">new</span> Tuple3<>(<span class="hljs-number">1</span>,<span class="hljs-number">2</span>,<span class="hljs-number">13</span>));
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = <span class="hljs-keyword">new</span> OutputTag<Tuple3<Integer,Integer,Integer>>(<span class="hljs-string">"zeroStream"</span>) {};
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = <span class="hljs-keyword">new</span> OutputTag<Tuple3<Integer,Integer,Integer>>(<span class="hljs-string">"oneStream"</span>) {};
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(<span class="hljs-keyword">new</span> ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">processElement</span><span class="hljs-params">(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out)</span> <span class="hljs-keyword">throws</span> Exception </span>{
<span class="hljs-keyword">if</span> (value.f0 == <span class="hljs-number">0</span>) {
ctx.output(zeroStream, value);
} <span class="hljs-keyword">else</span> <span class="hljs-keyword">if</span> (value.f0 == <span class="hljs-number">1</span>) {
ctx.output(oneStream, value);
}
}
});
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
zeroSideOutput.print();
oneSideOutput.printToErr();
<span class="hljs-comment">//打印结果</span>
String jobName = <span class="hljs-string">"user defined streaming source"</span>;
env.execute(jobName);
}
可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是可以多次进行拆分的,无需担心会爆出异常。
总结
这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。
点击这里下载本课程源码。
第11讲:Flink CEP 复杂事件处理
你好,欢迎来到第 11 课时,这一课时将介绍 Flink 中提供的一个很重要的功能:复杂事件处理 CEP。
背景
Complex Event Processing(CEP)是 Flink 提供的一个非常亮眼的功能,关于 CEP 的解释我们引用维基百科中的一段话:
在我们的实际生产中,随着数据的实时性要求越来越高,实时数据的量也在不断膨胀,在某些业务场景中需要根据连续的实时数据,发现其中有价值的那些事件。
说到底,Flink 的 CEP 到底解决了什么样的问题呢?
比如,我们需要在大量的订单交易中发现那些虚假交易,在网站的访问日志中寻找那些使用脚本或者工具“爆破”登录的用户,或者在快递运输中发现那些滞留很久没有签收的包裹等。
如果你对 CEP 的理论基础非常感兴趣,推荐一篇论文“Efficient Pattern Matching over Event Streams”。
Flink 对 CEP 的支持非常友好,并且支持复杂度非常高的模式匹配,其吞吐和延迟都令人满意。
程序结构
Flink CEP 的程序结构主要分为两个步骤:
- 定义模式
- 匹配结果
我们在官网中可以找到一个 Flink 提供的案例:
DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
out.collect(createAlertFrom(pattern));
}
});
在这个案例中可以看到程序结构分别是:
- 第一步,定义一个模式 Pattern,在这里定义了一个这样的模式,即在所有接收到的事件中匹配那些以 id 等于 42 的事件,然后匹配 volume 大于 10.0 的事件,继续匹配一个 name 等于 end 的事件;
- 第二步,匹配模式并且发出报警,根据定义的 pattern 在输入流上进行匹配,一旦命中我们的模式,就发出一个报警。
模式定义
Flink 支持了非常丰富的模式定义,这些模式也是我们实现复杂业务逻辑的基础。我们把支持的模式简单做了以下分类,完整的模式定义 API 支持可以参考官网资料。
简单模式
联合模式
匹配后的忽略模式
源码解析
我们在上面的官网案例中可以发现,Flink CEP 的整个过程是:
- 从一个 Source 作为输入
- 经过一个 Pattern 算子转换为 PatternStream
- 经过 select/process 算子转换为 DataStream
我们来看一下 select 和 process 算子都做了什么?
可以看到最终的逻辑都是在 PatternStream 这个类中进行的。
public <R> SingleOutputStreamOperator<R> process(
final PatternProcessFunction<T, R> patternProcessFunction,
final TypeInformation<R> outTypeInfo) {
return builder.build(
outTypeInfo,
builder.clean(patternProcessFunction));
}
最终经过 PatternStreamBuilder 的 build 方法生成了一个 SingleOutputStreamOperator,这个类继承了 DataStream。
最终的处理计算逻辑其实都封装在了 CepOperator 这个类中,而在 CepOperator 这个类中的 processElement 方法则是对每一条数据的处理逻辑。
同时由于 CepOperator 实现了 Triggerable 接口,所以会执行定时器。所有核心的处理逻辑都在 updateNFA 这个方法中。
入口在这里:
private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
Collection<Map<String, List<IN>>> patterns =
nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy, cepTimerService);
processMatchedSequences(patterns, timestamp);
}
}
NFA 的全称为 非确定有限自动机,NFA 中包含了模式匹配中的各个状态和状态间的转换。
在 NFA 这个类中的核心方法是:process 和 advanceTime,这两个方法的实现有些复杂,总体来说可以归纳为,每当一条新来的数据进入状态机都会驱动整个状态机进行状态转换。
实战案例
我们模拟电商网站用户搜索的数据来作为数据的输入源,然后查找其中重复搜索某一个商品的人,并且发送一条告警消息。
代码如下:
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource source = env.fromElements(
//浏览记录
Tuple3.of("Marry", "外套", 1L),
Tuple3.of("Marry", "帽子",1L),
Tuple3.of("Marry", "帽子",2L),
Tuple3.of("Marry", "帽子",3L),
Tuple3.of("Ming", "衣服",1L),
Tuple3.of("Marry", "鞋子",1L),
Tuple3.of("Marry", "鞋子",2L),
Tuple3.of("LiLei", "帽子",1L),
Tuple3.of("LiLei", "帽子",2L),
Tuple3.of("LiLei", "帽子",3L)
);
//定义Pattern,寻找连续搜索帽子的用户
Pattern<Tuple3<String, String, Long>, Tuple3<String, String, Long>> pattern = Pattern
.<Tuple3<String, String, Long>>begin("start")
.where(new SimpleCondition<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
return value.f1.equals("帽子");
}
}) //.timesOrMore(3);
.next("middle")
.where(new SimpleCondition<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
return value.f1.equals("帽子");
}
});
KeyedStream keyedStream = source.keyBy(<span class="hljs-number">0</span>);
PatternStream patternStream = CEP.pattern(keyedStream, pattern);
SingleOutputStreamOperator matchStream = patternStream.select(<span class="hljs-keyword">new</span> PatternSelectFunction<Tuple3<String, String, Long>, String>() {
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">select</span><span class="hljs-params">(Map<String, List<Tuple3<String, String, Long>>> pattern)</span> <span class="hljs-keyword">throws</span> Exception </span>{
List<Tuple3<String, String, Long>> middle = pattern.get(<span class="hljs-string">"middle"</span>);
<span class="hljs-keyword">return</span> middle.get(<span class="hljs-number">0</span>).f0 + <span class="hljs-string">":"</span> + middle.get(<span class="hljs-number">0</span>).f2 + <span class="hljs-string">":"</span> + <span class="hljs-string">"连续搜索两次帽子!"</span>;
}
});
matchStream.printToErr();
env.execute(<span class="hljs-string">"execute cep"</span>);
}
上述代码的逻辑我们可以分解如下。
首先定义一个数据源,模拟了一些用户的搜索数据,然后定义了自己的 Pattern。这个模式的特点就是连续两次搜索商品“帽子”,然后进行匹配,发现匹配后输出一条提示信息,直接打印在控制台上。
可以看到,提示信息已经打印在了控制台上。
总结
这一课时讲解了 Flink CEP 的支持和实现,并且模拟了一下简单的电商搜索场景来实现对连续搜索某一个商品的提示,关于模式匹配还有更加复杂的应用,比如识别网站的用户的爆破登录、运维监控等,建议你结合源码和官网进行深入的学习。
点击这里下载本课程源码
第12讲:Flink 常用的 Source 和 Connector
本课时我们主要介绍 Flink 中支持的 Source 和常用的 Connector。
Flink 作为实时计算领域强大的计算能力,以及与其他系统进行对接的能力都非常强大。Flink 自身实现了多种 Source 和 Connector 方法,并且还提供了多种与第三方系统进行对接的 Connector。
我们可以把这些 Source、Connector 分成以下几个大类。
预定义和自定义 Source
在前面的第 04 课时“Flink 常用的 DataSet 和 DataStream API”中提到过几种 Flink 已经实现的新建 DataStream 方法。
基于文件
我们在本地环境进行测试时可以方便地从本地文件读取数据:
readTextFile(path)
readFile(fileInputFormat, path)
...
可以直接在 ExecutionEnvironment 和 StreamExecutionEnvironment 类中找到 Flink 支持的读取本地文件的方法,如下图所示:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// read text file from an HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.types(Integer.class, String.class, Double.class);
// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.includeFields("10010") // take the first and the fourth field
.types(String.class, Double.class);
// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.pojoType(Person.class, "name", "age", "zipcode");
基于 Collections
我们也可以基于内存中的集合、对象等创建自己的 Source。一般用来进行本地调试或者验证。
例如:
fromCollection(Collection)
fromElements(T ...)
我们也可以在源码中看到 Flink 支持的方法,如下图所示:
DataSet<String> text = env.fromElements(
"Flink Spark Storm",
"Flink Flink Flink",
"Spark Spark Spark",
"Storm Storm Storm"
);
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
基于 Socket
通过监听 Socket 端口,我们可以在本地很方便地模拟一个实时计算环境。
StreamExecutionEnvironment 中提供了 socketTextStream 方法可以通过 host 和 port 从一个 Socket 中以文本的方式读取数据。
DataStream<String> text = env.socketTextStream("127.0.0.1", 9000, "\n");
自定义 Source
我们可以通过实现 Flink 的SourceFunction 或者 ParallelSourceFunction 来实现单个或者多个并行度的 Source。
例如,我们在之前的课程中用到的:
public class MyStreamingSource implements SourceFunction<Item> {
private boolean isRunning = true;
/**
* 重写run方法产生一个源源不断的数据发送源
* @param ctx
* @throws Exception
*/
public void run(SourceContext<Item> ctx) throws Exception {
while(isRunning){
Item item = generateItem();
ctx.collect(item);
//每秒产生一条数据
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
//随机产生一条商品数据
private Item generateItem(){
int i = new Random().nextInt(100);
ArrayList<String> list = new ArrayList();
list.add("HAT");
list.add("TIE");
list.add("SHOE");
Item item = new Item();
item.setName(list.get(new Random().nextInt(3)));
item.setId(i);
return item;
}
}
自带连接器
Flink 中支持了比较丰富的用来连接第三方的连接器,可以在官网中找到 Flink 支持的各种各样的连接器:
-
Apache Kafka (source/sink)
-
Apache Cassandra (sink)
-
Amazon Kinesis Streams (source/sink)
-
Elasticsearch (sink)
-
Hadoop FileSystem (sink)
-
RabbitMQ (source/sink)
-
Apache NiFi (source/sink)
-
Twitter Streaming API (source)
-
Google PubSub (source/sink)
基于 Apache Bahir 发布
Flink 还会基于 Apache Bahir 来发布一些 Connector,比如我们常用的 Redis 等。
我们可以在 Bahir 的首页中找到目前支持的 Flink 连接器:
-
Flink streaming connector for ActiveMQ
-
Flink streaming connector for Akka
-
Flink streaming connector for Flume
-
Flink streaming connector for InfluxDB
-
Flink streaming connector for Kudu
-
Flink streaming connector for Redis
-
Flink streaming connector for Netty
其中就有我们非常熟悉的 Redis,很多同学 Flink 项目中访问 Redis 的方法都是自己进行的实现,推荐使用 Bahir 连接器。
在本地单机情况下:
public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
}
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
当然我们也可以使用在集群或者哨兵模式下使用 Redis 连接器。
集群模式:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
哨兵模式:
FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
.setMasterName("master").setSentinels(...).build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
基于异步 I/O 和可查询状态
异步 I/O 和可查询状态都是 Flink 提供的非常底层的与外部系统交互的方式。
其中异步 I/O 是为了解决 Flink 在实时计算中访问外部存储产生的延迟问题,如果我们按照传统的方式使用 MapFunction,那么所有对外部系统的访问都是同步进行的。在很多情况下,计算性能受制于外部系统的响应速度,长时间进行等待,会导致整体吞吐低下。
我们可以通过继承 RichAsyncFunction 来使用异步 I/O:
/**
* 实现 'AsyncFunction' 用于发送请求和设置回调
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** 能够利用回调函数并发发送请求的数据库客户端 */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// 发送异步请求,接收 future 结果
final Future<String> result = client.query(key);
// 设置客户端完成请求后要执行的回调函数
// 回调函数只是简单地把结果发给 future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// 显示地处理异常
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// 创建初始 DataStream
DataStream<String> stream = ...;
// 应用异步 I/O 转换操作
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
其中,ResultFuture 的 complete 方法是异步的,不需要等待返回。
我们在之前讲解 Flink State 时,提到过 Flink 提供了 StateDesciptor 方法专门用来访问不同的 state,StateDesciptor 同时还可以通过 setQueryable 使状态变成可以查询状态。可查询状态目前是一个 Beta 功能,暂时不推荐使用。
总结
这一课时讲解了 Flink 主要支持的 Source 和 Connector,这些是我们用 Flink 访问其他系统的桥梁。本节课也为我们寻找合适的连接器指明了方向。其中最重要的 Kafka 连接器我们将会在后面的实战课时中单独讲解。
点击这里下载本课程源码