更多代码请见:https://github.com/xubo245/SparkLearning
1.理解
WindowsWordCount是滑动窗口技术的应用,是统计多个窗口,在滑动。可以用于统计最近30秒或者最近一个小时的信息,单个batch还可以保留为1秒,然后每隔10秒或者半个小时的滑动进行统计
2.运行:
输入:
hadoop@Master:~/cloud/testByXubo/spark/Streaming$ nc -lk 9999
word
a
a
a
a
a
world
a
word
word
world
hello
a
a
a
a
a
hello
xubo
a
a
a
a
a
ab
b
b
b
bb
b
^C
输出:
hadoop@Master:~/cloud/testByXubo/spark/Streaming/windowsWordCount$ ./submitJob.sh
-------------------------------------------
Time: 1461680330000 ms
-------------------------------------------
-------------------------------------------
Time: 1461680340000 ms
-------------------------------------------
(word,1)
(world,1)
(a,5)
-------------------------------------------
Time: 1461680350000 ms
-------------------------------------------
(word,3)
(hello,1)
(world,2)
(a,6)
-------------------------------------------
Time: 1461680360000 ms
-------------------------------------------
(word,3)
(hello,2)
(world,2)
(a,11)
-------------------------------------------
Time: 1461680370000 ms
-------------------------------------------
(xubo,1)
(word,2)
(hello,2)
(world,1)
(a,6)
-------------------------------------------
Time: 1461680380000 ms
-------------------------------------------
(xubo,1)
(hello,1)
(a,5)
-------------------------------------------
Time: 1461680390000 ms
-------------------------------------------
(b,4)
(xubo,1)
(bb,1)
(a,5)
(ab,1)
4.源码:
package org.apache.spark.Streaming.learning
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object WindowsWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
//创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(".")
// //获取数据
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(","))
//windows操作
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
//val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_, _-_,Seconds(args(2).toInt), Seconds(args(3).toInt))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}