0
点赞
收藏
分享

微信扫一扫

sparkStreaming-WordCountDemo处理socket数据

止止_8fc8 2022-07-14 阅读 26


步骤一:配置环境,特别注意虚拟机的hosts文件和window中的hosts文件,以及关闭防火墙

     linux:   sudo vim /etc/hosts  添加ip到name的映射,从而让外网访问你时用name便可

     window:   C:\Windows\System32\drivers\etc 将linux /etc/hosts 中的映射写进去,这用于在window上可以访问linux

测试:ping inux虚拟机ip /name (测试网络互通性)

          window cmd : telnet +linux虚拟机ip /name + 端口 (测试端口是否能被访问到)


步骤二:在idea上编写wordcount案例! 

            

package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Hello world!
*
*/
/**
* Spark Streaming处理Socket数据
*
* 测试: nc
*/
object NetworkWordCount {
/**官方介绍
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: NetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
*/

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

/**
* 创建StreamingContext需要两个参数:SparkConf和batch interval
*/
val ssc = new StreamingContext(sparkConf, Seconds(5))

val lines = ssc.socketTextStream("hadoop002", 6789)

val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

result.print()

ssc.start()
ssc.awaitTermination()
}
}

pom.xml

<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Streaming 依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.6.5</version>
</dependency>

<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>

步骤三:启动测试:

linux 输入nc -lk 6789 

window上的idea启动项目

在linux nc -lk 6789 下输入 a b a b c d 

在window的idea上看到统计的结果


可能异常:connnect time out!

原因:1.防火墙没有关闭, 

          2. linux 机器ip name映射没有添加到windows的hosts文件中,

          3.linux 上机器name 没生成成功,ip 到name 映射没有生效

          4.代码上ip/name 和端口写错了

举报

相关推荐

0 条评论