文章目录
- 1.transform
- 2.updateStateByKey
- 3.window
map (func)
对DStream中的各个元素进行func函数操作,然后返回一个新的DStream
flatMap (func)
与map方法类似,只不过各个输入项可以被输出为零个或多个输出项
filter (func)
过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream
repartition (numPartitions)
增加或减少DStream中的分区数,从而改变DStream的并行度
union (otherStream)
将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
count()
通过对DStreaim中 的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
reduce (func)
对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream
countByValue ()
对于元素类型为K的DStream,返回一个元素为(K, Long)键值对形式的新的DStream, Long对应的值为源DStream中各个
RDD的key出现的次数
reduceByKey (func,[nomTasks])
利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
join (otherStream, [numTasks] )
输入为(K,V)、(K,W) 类型的DStream, 返回一个新的(K,(v,w)类型的DStream
cogroup (otherStream,
[nonmTasks])
输入为(K,V)、 (K,W) 类型的DStream,返回一个新的(K,Seq[V], Seq[W]) 元组类型的
DStream
cogroup就是groupByKey的另外一种变体,groupByKey是操作一个K-V键值对,而cogroup一次操作两个,有点像join,不同之处在于返回值结果
val ds1:DStream[(K, V)]
val ds2:DStream[(K, W)]
val cg:DStream[(K, (Iterable[V], Iterable[W]))] = ds1.cogroup(ds1)
transform (func)
通过RDD-to-RDD函数作用于源码DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
updateStateByKey (func)
根据于key的前置状态和key的新值,对key进行更新, 返回一个新状态的Dstream
Window函数
window窗口操作,每个多长M时间,通过过往N长时间内产生的数据
这里主要介绍最后三个算子
下面所有的案例均在机器上使用nc命令打开9999端口发送消息来进行测试
nc -lk hadoop01 9999
1.transform
transform是一个transformation算子,转换算子。DStream上述提供的所有的transformation操作,都是DStream to DStream操作,没有一个DStream和RDD的直接操作,而DStream本质上是一系列RDD,所以RDD to RDD操作是显然被需要的,所以此时官方api中提供了一个为了达成此操作的算子——transform操作。
其最最最经典的实现就是DStream和rdd的join操作,还有dstream重分区(分区减少,coalsce)。
也就是说transform主要就是用来自定义官方api没有提供的一些操作。
Transform.scala
package sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @Author Daniel
* @Description transform算子
**/
object Transform {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Transform")
.setMaster("local[*]")
val batchInterval = Seconds(2)
val ssc = new StreamingContext(conf, batchInterval)
val lines = ssc.socketTextStream("hadoop01", 9999)
//在transform中使用flatMap、map、reduceByKey、coalesce算子
val words = lines.transform(rdd => rdd.flatMap(_.split("\\s+")))
val pairs = words.transform(rdd => rdd.map((_, 1)))
val res = pairs.transform(rdd => rdd.reduceByKey(_ + _))
//使用coalesce算子减少分区
res.transform(_.coalesce(2))
res.print()
ssc.start()
ssc.awaitTermination()
}
}
2.updateStateByKey
这个算子一般不建议使用。根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的Dstream。就是统计截止到目前为止key的状态。
主要步骤如下:
- 定义状态:可以是任意数据类型
- 定义状态更新函数:用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。对于有状态的操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得越来越大
- 要思考的是如果数据量很大的时候,或者对性能的要求极为苛刻的情况下,可以考虑将数据放在Redis或者tachyon或者ignite上
- 注意,updateStateByKey操作,要求必须开启Checkpoint机制。
UpdateStateByKey.scala
package sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
/**
* @Author Daniel
* @Description UpdateStateByKey算子
**/
object UpdateStateByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("_04UpdateStateByKeyOps")
.setMaster("local[*]")
val batchInterval = Seconds(2)
val ssc = new StreamingContext(conf, batchInterval)
//必须开启checkpoint机制
ssc.checkpoint("file:///F:/ssdata/checkpoint/ck1")
val lines = ssc.socketTextStream("hadoop01", 9999)
val words = lines.transform(rdd => rdd.flatMap(_.split("\\s+")))
val pairs = words.transform(rdd => rdd.map((_, 1)))
val ret = pairs.transform(rdd => rdd.reduceByKey(_ + _))
//依赖两个状态:一者前置状态,一者当前状态
val usb = ret.updateStateByKey(updateFunc)
usb.print()
ssc.start()
ssc.awaitTermination()
}
//状态更新函数,聚合截止目前为止所有key的状态
def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = {
//seq为当前key的状态,option为key对应的历史值
Option(seq.sum + option.getOrElse(0))
}
}
3.window
window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。
红色的矩形就是一个窗口,窗口框住的是一段时间内的数据流。
这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。
所以基于窗口的操作,需要指定2个参数:
window length - The duration of the window (3 in the figure)
slide interval - The interval at which the window-based operation is performed (2 in the figure).
简而言之,window窗口操作就是每过M时间,计算N长时间内产生的数据,M就是滑动长度,N就是窗口长度。
Window.scala
package sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @Author Daniel
* @Description Window算子
**/
object Window {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Window")
.setMaster("local[*]")
val batchInterval = 2
val ssc = new StreamingContext(conf, Seconds(batchInterval))
val lines = ssc.socketTextStream("hadoop01", 9999)
val words = lines.transform(rdd => rdd.flatMap(_.split("\\s+")))
val pairs = words.transform(rdd => rdd.map((_, 1)))
//每隔4s,统计过去6s单位内产生的数据
val windowDuration = Seconds(batchInterval * 3)
val slideDuration = Seconds(batchInterval * 2)
//windowDuration就是窗口长度,slideDuration就是滑动长度
val ret = pairs.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, windowDuration, slideDuration)
ret.print()
ssc.start()
ssc.awaitTermination()
}
}