0
点赞
收藏
分享

微信扫一扫

SparkStreaming滑动窗口reduceByKeyAndWindow

RIOChing 2022-01-06 阅读 63

需求

在这里插入图片描述

代码实现

 package sparkstreaming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object Demo5Window {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
    conf.setMaster("local[2]")
    conf.setAppName("Demo5Window")

    val sc: SparkContext = new SparkContext(conf)

    /**
      *创建SparkStreaming环境
      * 指定多久运行一次
      */
    val ssc: StreamingContext = new StreamingContext(sc, Durations.seconds(5))

    /**
      * 设置checkpoint路径
      */
    ssc.checkpoint("SparkLearning/src/main/data/checkpoint")

    //读取数据

    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master",8888)

    val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split(","))
      .map((_, 1))

    /**
      * 统计近15秒带刺的数量,每隔五秒统计一次
      *
      * 窗口的大小,必须是 滑动时间 的 整数倍
      */
    /*val countDS: DStream[(String, Int)] = kvDS.reduceByKeyAndWindow(
      (x: Int, y: Int) => x + y, //聚合函数
      Durations.seconds(15), //窗口的大小,必须是一个batch滑动时间的整数倍
      Durations.seconds(5) //滑动时间

    )*/

    /**
      * 如果窗口存在交叉的情况下,会重复计算数据
      * 所以可以对窗口进行优化
      */
    val countDS: DStream[(String, Int)] = kvDS.reduceByKeyAndWindow(
      (x: Int, y: Int) => x + y, //聚合函数
      (i: Int, j: Int) => i - j,//减去多余数据的函数
      Durations.seconds(15), //窗口的大小,必须是一个batch滑动时间的整数倍
      Durations.seconds(5) //滑动时间
    )

    countDS
        .filter(_._2 !=0)
      .print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

举报

相关推荐

0 条评论