1.基于服务器 log 的热门页面浏览量统计
每隔 5 秒,输出最近 10 分钟内访问量最多的前 N 个 URL
package com.chuangyan.network35
import java.lang
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.Duration
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
object NetworkFlow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)
env.setParallelism(1)
val source: DataStream[String] = env.readTextFile("D:\\study\\Code\\UserBehavior\\NetworkFlowAnalysis\\src\\main\\resources\\apache.log")
val dataStream: DataStream[ApacheLogEvent] = source.map(line => {
val split = line.split(" ")
val ip = split(0)
val userId = split(1)
val format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val date = format.parse(split(3))
val eventTime = date.getTime
val method = split(5)
val url = split(6)
ApacheLogEvent(ip, userId, eventTime, method, url)
})
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner[ApacheLogEvent] {
override def extractTimestamp(element: ApacheLogEvent, ApacheLogEvent: Long): Long = element.eventTime
}))
//每隔五秒输出近十分钟内访问量最多的前N个URL
dataStream.filter(_.url!="/")
.keyBy(_.url)
.timeWindow(Time.minutes(10),Time.seconds(5))
.aggregate(new CountAgg(),new WindowResultFunction())
//窗内排序
.keyBy(_.windowEnd)
.process(new TopNHotUrls(5))
.print()
env.execute("url job")
}
}
class CountAgg extends AggregateFunction[ApacheLogEvent,Long,Long]{
override def createAccumulator(): Long = 0L
override def add(in: ApacheLogEvent, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
case class UrlViewCount(url:String,windowEnd:Long,count:Long)
class WindowResultFunction extends WindowFunction[Long,UrlViewCount,String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
val url=key
val windowEnd=window.getEnd
val count=input.iterator.next()
out.collect(UrlViewCount(url,windowEnd,count))
}
}
class TopNHotUrls(size: Int) extends KeyedProcessFunction[Long,UrlViewCount,String]{
var listState: ListState[UrlViewCount] =_
override def open(parameters: Configuration): Unit = {
listState= getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("listState",classOf[UrlViewCount]))
}
override def processElement(i: UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]#Context, collector: Collector[String]): Unit = {
//添加到状态
listState.add(i)
//注册定时器
val ts=i.windowEnd+1
context.timerService().registerEventTimeTimer(ts)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
val viewCounts: lang.Iterable[UrlViewCount] = listState.get()
val buffer= ListBuffer[UrlViewCount]()
val it = viewCounts.iterator()
while (it.hasNext){
buffer += it.next()
}
val urlViewCount=buffer.sortBy(_.count).reverse.take(size)
//将排名信息格式化成String便于打印
val result: StringBuilder = new StringBuilder
result.append("====================")
result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")
for(i <- urlViewCount.indices){
val currentURL=urlViewCount(i)
result.append("No.").append(i+1).append(":")
.append("URL=").append(currentURL.url)
.append("浏览量=").append(currentURL.count).append("\n")
}
result.append("===================")
//控制输出频率 模拟实时滚动结果
Thread.sleep(1000)
out.collect(result.toString)
}
}