0
点赞
收藏
分享

微信扫一扫

SparkStreaming整合Flume-Pull方式(核心)

------------------------------------------SparkStreaming第二种方式整合Flume-----------------------------------
详细学习地址:https://spark.apache.org/docs/latest/streaming-flume-integration.html
注意这种方式相比第一种方式更可靠,支持容错,工作环境中更常用(事务机制保障),条件是接收到,而且有副本才会去处理
Pull-based Approach using a Custom Sink
Pull方式整合
步骤一:编写flume配置文件(建议参考官方文档)

Flume Agent的编写: flume_pull_streaming.conf


simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel


simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = hadoop000
simple-agent.sources.netcat-source.port = 44444


simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop000
simple-agent.sinks.spark-sink.port = 41414


simple-agent.channels.memory-channel.type = memory


simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel步骤二:编写测试类(wordcount)
-------------------------------------------------------------------------------------------------------------------------
测试类:
package com.imooc.spark


import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
* Spark Streaming整合Flume的第二种方式
*/
object FlumePullWordCount {


def main(args: Array[String]): Unit = {


if(args.length != 2) {
System.err.println("Usage: FlumePullWordCount <hostname> <port>")
System.exit(1)
}


val Array(hostname, port) = args


val sparkConf = new SparkConf() //.setMaster("local[2]").setAppName("FlumePullWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))


//TODO... 如何使用SparkStreaming整合Flume
val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)


flumeStream.map(x=> new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()


ssc.start()
ssc.awaitTermination()
}
}pom.xml依赖:

<!-- Spark Streaming整合Flume 依赖-->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-flume_2.11</artifactId>

<version>${spark.version}</version>

</dependency>



<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-flume-sink_2.11</artifactId>

<version>${spark.version}</version>

</dependency>



<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>

<version>${spark.version}</version>

</dependency>



<dependency>

<groupId>org.apache.commons</groupId>

<artifactId>commons-lang3</artifactId>

<version>3.5</version>

</dependency>

------------------------------------------------------------------------------------------------------------------------------------

本地联调:
注意点:先启动flume 后启动Spark Streaming应用程序
注意在idea配置参数,edit configurations ----->programe arguments hadoop000 41414
flume-ng agent \

--name simple-agent \

--conf $FLUME_HOME/conf \

--conf-file $FLUME_HOME/conf/flume_pull_streaming.conf \

-Dflume.root.logger=INFO,console


spark-submit \

--class com.imooc.spark.FlumePullWordCount \

--master local[2] \

--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \

/home/hadoop/lib/sparktrain-1.0.jar \

hadoop000 41414

举报

相关推荐

0 条评论