0
点赞
收藏
分享

微信扫一扫

网站热词排序项目


创建MySQL表来存放数据。

网站热词排序项目_spark

网站热词排序项目_mysql_02

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.SparkStream</groupId>
<artifactId>SparkStreamspace</artifactId>
<version>1.0-SNAPSHOT</version>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation=
"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>


<properties>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.4</hadoop.version>
<spark.version>2.3.2</spark.version>
</properties>
<dependencies>
<!--Scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--Spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>




<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.3.2</version>
</dependency>

</dependencies>

</project>

HotWordBySort.scala

import java.sql.{DriverManager, Statement}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object HotWordBySort {
def main(args: Array[String]): Unit = {
//1.创建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName("HotWordBySort").setMaster("local[2]")
//2.创建SparkContext对象
val sc: SparkContext = new SparkContext(sparkConf)
//3.设置日志级别
sc.setLogLevel("WARN")
//4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
//5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
val dstream: ReceiverInputDStream[String] = ssc
.socketTextStream("192.168.121.134", 9999)
//6.通过逗号分隔第一个字段和第二个字段
val itemPairs: DStream[(String, Int)] = dstream.map(line => (line
.split(",")(0), 1))
//7.调用reduceByKeyAndWindow操作,需要三个参数
val itemCount: DStream[(String, Int)] = itemPairs.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Seconds(60), Seconds(10))
//8.Dstream没有sortByKey操作,所以排序用transform实现,false降序,take(3)取前3
val hotWord = itemCount.transform(itemRDD => {
val top3: Array[(String, Int)] = itemRDD.map(pair => (pair._2, pair._1))
.sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
//9.将本地的集合(排名前三热词组成的集合)转成RDD
ssc.sparkContext.makeRDD(top3)
})
//10. 调用foreachRDD操作,将输出的数据保存到mysql数据库的表中
hotWord.foreachRDD(rdd => {
val url = "jdbc:mysql://192.168.121.134:3306/spark"
val user = "root"
val password = "Dn@123456"
Class.forName("com.mysql.jdbc.Driver")
val conn1 = DriverManager.getConnection(url, user, password)
conn1.prepareStatement("delete from searchKeyWord where 1=1")
.executeUpdate()
conn1.close()
rdd.foreachPartition(partitionOfRecords => {
val url = "jdbc:mysql://192.168.121.134:3306/spark"
val user = "root"
val password = "Dn@123456"
Class.forName("com.mysql.jdbc.Driver")
val conn2 = DriverManager.getConnection(url, user, password)
conn2.setAutoCommit(false)
val stat: Statement = conn2.createStatement()
partitionOfRecords.foreach(record => {
stat.addBatch("insert into searchKeyWord (insert_time, keyword, search_count) values(now(),' " + record._1 + "','" + record._2 + "') ")
})
stat.executeBatch()
conn2.commit()
conn2.close()
})
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}

执行命令nc -lk 9999启动服务端监听Socket服务,并输入内容。

运行程序,会统计10s内单词出现的次数

网站热词排序项目_big data_03



举报

相关推荐

0 条评论