0
点赞
收藏
分享

微信扫一扫

spark大数据分析:sparkStrreaming(17) 时间窗概念解析

捌柒陆壹 2022-02-10 阅读 32



文章目录


  • ​​批处理间隔​​
  • ​​窗口时间宽度与滑动时间宽度​​


批处理间隔

val ssc = new StreamingContext(sc, Seconds(5))

对于spark处理数据,数据以流式方式进入划分为一个批次一个批次的,每一段数据合并成一个RDD,并将RDD添加到DStream的HashMap中进行维护,因此数据的处理时间要小于间隔时间,否则造成数据堆压

窗口时间宽度与滑动时间宽度

对于多个批次指定对应的起始批次与结束批次对应的时间,这个时间区间就是"窗口时间宽度",

随着时间推进,只指定了窗口时间宽度不能动态持续对数据进行局部聚合,设置滑动时间宽度,随着窗口推移,持续按照指定的宽度移动

注意:窗口时间宽度与滑动时间宽度大小必须是批处理间隔整数倍

案例:每2s统计10s内平均温度

package stream

import java.util

import com.alibaba.fastjson.{JSON, TypeReference}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WindowTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Chapter8_4_2")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))

val jsonDstream = ssc.socketTextStream("note01", 8888)
val cityAndTemp = jsonDstream.map(json => {
val json2JavaMap = JSON.parseObject(json, new TypeReference[util.Map[String, String]]() {})
import scala.collection.JavaConverters._
val json2ScalaMap = json2JavaMap.asScala
json2ScalaMap
})
cityAndTemp.map(scalaMap => (scalaMap("city"),scalaMap("temp"))).mapValues(temp => (temp.toFloat,1))
.reduceByKeyAndWindow(
(t1:(Float,Int),t2:(Float,Int)) => (t1._1+t2._1,t1._2+t2._2),
Seconds(10),
Seconds(2)
).mapValues(x => x._1/x._2).print()

ssc.start()
ssc.awaitTermination()
}
}



举报

相关推荐

0 条评论