需求

代码实现
package sparkstreaming
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object Demo5Window {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("Demo5Window")
val sc: SparkContext = new SparkContext(conf)
val ssc: StreamingContext = new StreamingContext(sc, Durations.seconds(5))
ssc.checkpoint("SparkLearning/src/main/data/checkpoint")
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master",8888)
val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split(","))
.map((_, 1))
val countDS: DStream[(String, Int)] = kvDS.reduceByKeyAndWindow(
(x: Int, y: Int) => x + y,
(i: Int, j: Int) => i - j,
Durations.seconds(15),
Durations.seconds(5)
)
countDS
.filter(_._2 !=0)
.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}