介绍
测输出流SideOutput说白了就是可以将一个流变成两个流.
代码
import com.atguigu.apitest.SensorReading
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* 测输出流
*/
object SideOutputTest2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream: DataStream[SensorReading] = env.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.8),
SensorReading("sensor_6", 1547718201, 15.4),
SensorReading("sensor_7", 1547718202, 6.7),
SensorReading("sensor_10", 1547718205, 38.1),
SensorReading("sensor_1", 1547718207, 37.2),
SensorReading("sensor_1", 1547718212, 33.5),
SensorReading("sensor_1", 1547718215, 38.1)
))
// 用 ProcessFunction的侧输出流实现分流操作
val highTempStream: DataStream[SensorReading] = dataStream
.process(new SplitTempProcessor(30.0))
//测输出流
val lowTempStream = highTempStream.getSideOutput(new OutputTag[(String, Double, Long)]("low-temp"))
// 打印输出
highTempStream.print("high")
lowTempStream.print("low")
env.execute("side output job")
}
}
// 自定义 ProcessFunction,用于区分高低温度的数据
//ProcessFunction中泛型1是输入类型,泛型2是输出类型
class SplitTempProcessor(threshold: Double) extends ProcessFunction[SensorReading, SensorReading] {
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading,
SensorReading]#Context, out: Collector[SensorReading]): Unit = {
// 判断当前数据的温度值,如果大于阈值,输出到主流;如果小于阈值,输出到侧输出流
if (value.temperature > threshold) {
// 输出,当然你也可以在这里做一些业务逻辑
out.collect(value)
} else {
// 这边可以做一些逻辑.....
//输出测输出流
// 参数1是标签,参数2是输出类型,输出类型可以和主流的输出类型不一样.
ctx.output(new OutputTag[(String, Double, Long)]("low-temp"),
(value.id, value.temperature, value.timestamp))
}
}
}
执行结果
high> SensorReading(sensor_1,1547718199,35.8)
low> (sensor_6,15.4,1547718201)
low> (sensor_7,6.7,1547718202)
high> SensorReading(sensor_10,1547718205,38.1)
high> SensorReading(sensor_1,1547718207,37.2)
high> SensorReading(sensor_1,1547718212,33.5)
high> SensorReading(sensor_1,1547718215,38.1)
可以发现.temperature>30.0的会被分配到high流,否则就会被分配到low流
这就是测输出流,可以根据条件进行分流