0
点赞
收藏
分享

微信扫一扫

大数据系列-SPARK-STREAMING流数据transform

半夜放水 2022-03-20 阅读 49

package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

//有状态函数transform
//用于DSTREAM转换RDD增加功能
//用于周期执行
object SparkStreamingStateTransform {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("SparkStreamingState").setMaster("local[*]")
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))
    streamingContext.checkpoint("data/cpDir")

    val dstream: ReceiverInputDStream[String] = streamingContext.socketTextStream("localhost", 8600)

    //运行与DRIVER端
    var dstream1: DStream[String] = dstream.transform(
      rdd => {
        //运行与DRIVER端(按周期执行)
        rdd.map(str => {
          //运行于EXECUTOR端
          str
        })
      }
    )

    streamingContext.start()
    streamingContext.awaitTermination()

  }

}
举报

相关推荐

0 条评论