0
点赞
收藏
分享

微信扫一扫

Flink中Window详解之Window的聚合函数AggregateFunction

陬者 2022-07-04 阅读 71

和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算 函数,但 AggregateFunction 在窗口计算上更加通用。AggregateFunction 接口相对 ReduceFunction 更加灵活,实现复杂度也相对较高。AggregateFunction 接口中定义了三个 需要复写的方法,其中 add()定义数据的添加逻辑,getResult 定义了根据 accumulator 计 算结果的逻辑,merge 方法定义合并 accumulator 的逻辑。

package window

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
* @Author yqq
* @Date 2021/12/27 20:42
* @Version 1.0
*/
case class StationLog(sid:String,callOut:String,callInput:String,callType:String,callTime:Long,duration:Long)
object AggregatFunctionTest {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//每隔5秒统计最近8秒内,每个基站的日志数量
//读取数据源
val stream: DataStream[StationLog] = environment.socketTextStream("node1", 8888)
.map(line => {
val arr: Array[String] = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
//开窗
stream.map(log=>{(log.sid,1)})
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(8),Time.seconds(5)))//开窗,滑动窗口
.aggregate(new MyAggregateFuntion,new MyWindowFunction)
.print()

environment.execute()
}
//MyWindowFunction 输入数据来自于 MyAggregateFuntion,在窗口结束的时候先执行MyAggregateFuntion对象的getResult,然后在执行apply方法
class MyWindowFunction extends WindowFunction[Long,(String,Long),String,TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = {
out.collect((key,input.iterator.next()))//next得到的第一个值,迭代器中只有一个值
}
}
class MyAggregateFuntion extends AggregateFunction[(String,Int),Long,Long] {
//初始化一个累加器,开始的时候为0
override def createAccumulator(): Long = 0
//来一条数据执行一次
override def add(value: (String, Int), accumulator: Long): Long = accumulator+value._2
//在窗口结束的时候执行一次
override def getResult(accumulator: Long): Long = accumulator

override def merge(a: Long, b: Long): Long = a+b
}
}

Flink中Window详解之Window的聚合函数AggregateFunction_flink
Flink中Window详解之Window的聚合函数AggregateFunction_ide_02


举报

相关推荐

0 条评论