Flink ProcessFunction
文章目录
之前使用的转换算子是无法访问事件的时间戳信息和水位线信息的,在某些应用场景下,我们需要这些信息。因此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件(比如将筛选数据、超时事件等)。ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑,之前使用的window函数和转换算子无法实现。比如,FlinkSQL就是使用ProcessFunction实现的。
Flink提供了8个Process Function:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
其中最常用的是KeyedProcessFunction。
一、 KeyedProcessFunction
KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有open()
、close()
和getRuntimeContext()
等方法。而KeyedProcessFunction<K, I, O>
还额外提供了两个方法:
processElement(I value, Context ctx, Collector<O> out)
,流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的 key ,以及TimerService 时间服务。 Context 还可以将结果输出到别的流(side outputs)。onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
,是一个回调函数。当之前注册的定时器触发时调用。参数timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext和processElement的Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。
测试代码:
设置一次获取数据后第5s给出提示信息的定时器
package com.root.process;
import com.root.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author Kewei
* @Date 2022/3/7 9:27
*/
public class ProcessTest1_KeyedProcessFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
SingleOutputStreamOperator<Long> resultStream = dataStream.keyBy("id").process(new MyKeyedProcess());
resultStream.print();
env.execute();
}
public static class MyKeyedProcess extends KeyedProcessFunction<Tuple, SensorReading, Long>{
ValueState<Long> tsTemp;
@Override
public void open(Configuration parameters) throws Exception {
tsTemp = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts",Long.class));
}
@Override
public void close() throws Exception {
tsTemp.clear();
}
@Override
public void processElement(SensorReading value, KeyedProcessFunction<Tuple, SensorReading, Long>.Context context, Collector<Long> out) throws Exception {
out.collect((long) value.getId().length());
context.getCurrentKey();
context.timestamp();
context.timerService().currentProcessingTime();
context.timerService().currentWatermark();
context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime() + 5000L);
tsTemp.update(context.timerService().currentProcessingTime() + 1000L);
context.timerService().deleteProcessingTimeTimer(tsTemp.value());
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple, SensorReading, Long>.OnTimerContext ctx, Collector<Long> out) throws Exception {
System.out.println(timestamp + " 定时器触发");
ctx.getCurrentKey();
ctx.timeDomain();
}
}
}
输出
8
1646622646820 定时器触发
二、TimerService和定时器(Timers)
Context 和OnTimerContext 所持有的TimerService 对象拥有以下方法:
long currentProcessingTime()
返回当前处理时间long currentWatermark()
返回当前watermark 的时间戳void registerProcessingTimeTimer( long timestamp)
会注册当前key的processing time的定时器。当processing time 到达定时时间时,触发timer。void registerEventTimeTimer(long timestamp)
会注册当前key 的event time 定时器。当Watermark水位线大于等于定时器注册的时间时,触发定时器执行回调函数。void deleteProcessingTimeTimer(long timestamp)
删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。void deleteEventTimeTimer(long timestamp)
删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
当定时器timer触发时,会调用回调函数onTimer()。注意定时器timer只能在keyedStreams上面使用。
测试代码:
监控温度传感器的温度值,如果温度值在10秒钟之内连续上升,则报警。
package com.root.process;
import com.root.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author Kewei
* @Date 2022/3/7 9:51
*/
public class ProcessTest2_ApplicationCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// KeyBy之前使用“id”可以分组,这这里无法运行,必须使用方法引用SensorReading::getId
// 使用process可以做获取当前的id,运行时间戳、以及水位watermark
dataStream.keyBy(SensorReading::getId)
.process(new MyKeyedProcess(Time.seconds(10).toMilliseconds()))
.print();
env.execute();
}
public static class MyKeyedProcess extends KeyedProcessFunction<String, SensorReading,String>{
// 设置预警的时间
private Long interval;
public MyKeyedProcess(Long interval){
this.interval = interval;
}
// 设置值状态,保存上一次的温度
ValueState<Double> lastTemp;
// 设置值状态,保存最近一次预警的时间
ValueState<Long> recentTimerTimeStamp;
// 设置预警时调用的函数
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, SensorReading, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
// getCurrentKey()获取当前key,out.collect设置输出内容
out.collect("传感器 :"+ctx.getCurrentKey()+"温度值连续"+interval+"ms上升");
// 预警之后清楚最近一次预警的时间
recentTimerTimeStamp.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
// 初始化两个状态
lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("double",Double.class));
recentTimerTimeStamp = getRuntimeContext().getState(new ValueStateDescriptor<Long>("long",Long.class));
}
@Override
public void close() throws Exception {
// 清除两个状态的值
lastTemp.clear();
recentTimerTimeStamp.clear();
}
@Override
public void processElement(SensorReading value, KeyedProcessFunction<String, SensorReading, String>.Context ctx, Collector<String> out) throws Exception {
// 获取当前时间的温度
Double curTemp = value.getTemperature();
// 获取前一次的温度
Double lastTemper = lastTemp.value();
// 判断前一次的温度是否存在,若不存在使用当前温度,若存在使用本身,便于之后的比较
lastTemper = lastTemper==null?curTemp:lastTemper;
// 获取最近一次预警的时间
Long timerTime = recentTimerTimeStamp.value();
// 判断当前温度是否大于前一次温度,并且最近一次预警的时间为空(说明还没有预警)
// 若都满足,设置一个预警时间,注册一个触发器,并更新最近一次预警时间
if (curTemp>lastTemper && null == timerTime){
long warningTimer = ctx.timerService().currentProcessingTime() + interval;
ctx.timerService().registerProcessingTimeTimer(warningTimer);
recentTimerTimeStamp.update(warningTimer);
}
// 若当前温度小于等于前一次温度,以及前一次预警时间不为空(说明已经预警了)
// 若都满足,就是说温度下降了,需要把触发器删除,以及清除预警时间
if (curTemp <= lastTemper && timerTime != null){
ctx.timerService().deleteProcessingTimeTimer(timerTime);
recentTimerTimeStamp.clear();
}
// 更新上次温度
lastTemp.update(curTemp);
}
}
}
输出
传感器 :sensor_1温度值连续10000ms上升
三、侧输出流(SideOutput)
一个数据可以被多个window包含,只有其不被window包含的时候(包含该数据的所有window都关闭之后),才会被丢到侧输出流。
如果一个数据被丢到侧输出流,那么所有包含该数据的window都由于已经超过了“允许的迟到的时间”而关闭了,进而新来的迟到数据只能被丢到测输出流!
- 大部分的DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。
- processfunction 的side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。
- 一个side output 可以定义为OutputTag[X]对象,X 是输出流的数据类型。
- processfunction 可以通过Context 对象发射一个事件到一个或者多个side outputs。
测试代码:
将温度≥30放入高温输出,反正放入低温测流输出。
package com.root.process;
import com.root.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @author Kewei
* @Date 2022/3/7 10:47
*/
public class ProcessTest3_SideOutCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义一个OutputTag,用来表示侧输出流低温流
// An OutputTag must always be an anonymous inner class
// OutputTag必须始终是匿名的内部类
// so that Flink can derive a TypeInformation for the generic type parameter.
// 这样Flink就可以导出泛型类型参数的类型信息
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("lowTemp") {
};
SingleOutputStreamOperator<SensorReading> resultStream = dataStream.keyBy(SensorReading::getId)
.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, ProcessFunction<SensorReading, SensorReading>.Context ctx, Collector<SensorReading> out) throws Exception {
Double curTemp = value.getTemperature();
// 若温度大于30,正常输出,否则将数据写道测流中,context.output
if (curTemp > 30.0) {
out.collect(value);
} else {
ctx.output(outputTag, value);
}
}
});
resultStream.print("high-temp");
// 获取测流的数据
resultStream.getSideOutput(outputTag).print("low-temp");
env.execute();
}
}
输出
high-temp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
low-temp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=29.5}
四、CoProcessFunction
- 对于两条输入流,DataStream API 提供了CoProcessFunction 这样的low-level操作。CoProcessFunction 提供了操作每一个输入流的方法:
processElement1()
和processElement2()
。 - 类似于ProcessFunction,这两种方法都通过Context 对象来调用。这个Context对象可以访问事件数据,定时器时间戳,TimerService,以及side outputs。
- CoProcessFunction 也提供了onTimer()回调函数。