0
点赞
收藏
分享

微信扫一扫

Spark组件之Spark Streaming学习2--StatefulNetworkWordCount 学习


1.理解

StatefulNetworkWordCount 与​​NetworkWordCount​​不同的是会进行state标记,然后wordCount是累计,而不是只求一个batch

累计的实现是:updateStateByKey,里面有调用newUpdateFunc函数:

val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)

newUpdateFunc:

val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}


2.运行:

nc和run分别启动

输入:

hadoop@Mcnode6:~$ nc -lk 9999
hellO
world
hello
waaa

a
a
a
a
a
a
a
ahello
hello
hello
hello
a
a
a
a
a
a
b
b
b
b

b
b
b


输出节选:

-------------------------------------------
Time: 1461662247000 ms
-------------------------------------------
(hellO,1)
(,1)
(hello,5)
(waaa,1)
(world,2)
(a,13)
(ahello,1)



-------------------------------------------
Time: 1461662250000 ms
-------------------------------------------
(b,6)
(hellO,1)
(,2)
(hello,5)
(waaa,1)
(world,2)
(a,13)
(ahello,1)


累计WordCount的过程



3.源码:

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.Streaming.learning

import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import scala.Option.option2Iterable

/**
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
* second starting with initial value of word count.
* Usage: StatefulNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
* data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example
* org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999`
*/
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)
}

val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}

val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")

// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println



举报

相关推荐

0 条评论