0
点赞
收藏
分享

微信扫一扫

Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析_sql

 

Structrued Streaming业务数据实时分析_apache_02

 

 Structrued Streaming业务数据实时分析_spark_03

 

 

先启动spark-shell,记得启动nc服务

Structrued Streaming业务数据实时分析_apache_04

 

输入以下代码

Structrued Streaming业务数据实时分析_apache_05

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import spark.implicits._
import spark.implicits._

scala> val lines = spark.readStream.format("socket").option("host", "bigdata-pro01.kfk.com").option("port", 9999).load()
18/03/21 20:55:13 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
lines: org.apache.spark.sql.DataFrame = [value: string]

scala> val words = lines.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val wordCounts = words.groupBy("value").count()
wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

scala> val query = wordCounts.writeStream.outputMode("complete").format("console").start()
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4e260e04

 

 

 在nc输入几个单词

Structrued Streaming业务数据实时分析_sql_06

 

 Structrued Streaming业务数据实时分析_apache_07

 

 我们再输入一些单词

Structrued Streaming业务数据实时分析_spark_08

 

 Structrued Streaming业务数据实时分析_sql_09

 

 

我们改一下代码换成update模式

首先重新启动一次spark-shell,记得启动nc

Structrued Streaming业务数据实时分析_apache_10

 

 Structrued Streaming业务数据实时分析_sql_11

 

 

 Structrued Streaming业务数据实时分析_sql_12

Structrued Streaming业务数据实时分析_spark_13

 

换成append模式

Structrued Streaming业务数据实时分析_sql_14

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import spark.implicits._
import spark.implicits._

scala> val lines = spark.readStream.format("socket").option("host", "bigdata-pro01.kfk.com").option("port", 9999).load()
18/03/21 21:32:30 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
lines: org.apache.spark.sql.DataFrame = [value: string]

scala> val words = lines.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val query = words.writeStream.outputMode("append").format("console").start()
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@19d85bbe

 

 

Structrued Streaming业务数据实时分析_spark_15

 

 Structrued Streaming业务数据实时分析_apache_16

 

 

 Structrued Streaming业务数据实时分析_spark_17

 

Structrued Streaming业务数据实时分析_sql_18

因为我们之前的kafka的版本低了,我下载一个0.10.0版本的

下载地址 http://kafka.apache.org/downloads

Structrued Streaming业务数据实时分析_apache_19

 我们把kafka0.9版本的配置文件直接复制过来

为了快一点我直接在虚拟机里操作了

复制这几个配置文件

Structrued Streaming业务数据实时分析_apache_20

把kafka0.10的覆盖掉

Structrued Streaming业务数据实时分析_apache_21

Structrued Streaming业务数据实时分析_spark_22

 

 修改一下配置文件

Structrued Streaming业务数据实时分析_sql_23

Structrued Streaming业务数据实时分析_sql_24

 

 把kafka分发都另外的两个节点去

 Structrued Streaming业务数据实时分析_apache_25

Structrued Streaming业务数据实时分析_spark_26

 

 

在节点2和节点3也把相应的配置文件修改一下

server.properties

Structrued Streaming业务数据实时分析_apache_27

Structrued Streaming业务数据实时分析_spark_28

 

 

Structrued Streaming业务数据实时分析_sql_29

 

 

Structrued Streaming业务数据实时分析_sql_30

 

在idea里重新建一个scala类

 Structrued Streaming业务数据实时分析_apache_31

Structrued Streaming业务数据实时分析_sql_32

 

加上如下代码

Structrued Streaming业务数据实时分析_spark_33

 

package com.spark.test

import org.apache.spark
import org.apache.spark.sql.SparkSession

object StructuredStreamingKafka {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder().master("local[2]").appName("streaming").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
.option("subscribe", "weblogs")
.load()
import spark.implicits._
val lines= df.selectExpr("CAST(value AS STRING)").as[String]

val words = lines.flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()

query.awaitTermination()
}
}

 

跑一下我们的程序

Structrued Streaming业务数据实时分析_spark_34

 如果报错了提示需要0.10版本的可以先不用管

我们启动一下kafka

Structrued Streaming业务数据实时分析_spark_35

 

Structrued Streaming业务数据实时分析_sql_36

 

 

 可以看到程序已经在跑了

Structrued Streaming业务数据实时分析_spark_37

 

 

我们在kafak里创建一个生产者

Structrued Streaming业务数据实时分析_sql_38

bin/kafka-console-producer.sh --broker-list bigdata-pro01.kfk.com:9092 --topic weblogs

 

 

 我们输入几个单词

 Structrued Streaming业务数据实时分析_sql_39

 

可以看到idea这边的结果

Structrued Streaming业务数据实时分析_apache_40

 

我们可以换成update模式

Structrued Streaming业务数据实时分析_spark_41

Structrued Streaming业务数据实时分析_apache_42

 

 程序跑起来了

Structrued Streaming业务数据实时分析_spark_43

 

输入单词

Structrued Streaming业务数据实时分析_spark_44

 

 这个是运行的结果

Structrued Streaming业务数据实时分析_sql_45

 

 

Structrued Streaming业务数据实时分析_apache_46

 

 我们把包上传上来(3个节点都这样做)

Structrued Streaming业务数据实时分析_spark_47

 

 

启动spark-shell

Structrued Streaming业务数据实时分析_apache_48

 

把代码拷贝进来

Structrued Streaming业务数据实时分析_spark_49

 

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
.option("subscribe", "weblogs")
.load()
import spark.implicits._
val lines= df.selectExpr("CAST(value AS STRING)").as[String]

val words = lines.flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
.outputMode("update")
.format("console")
.start()

query.awaitTermination()

 

 这个时候一定要保持kafka和生产者是开启的

 我在生产者这边输入几个单词

Structrued Streaming业务数据实时分析_spark_50

 

 回到spark-shell界面可以看到统计结果

Structrued Streaming业务数据实时分析_apache_51

 

 

Structrued Streaming业务数据实时分析_apache_52

 

 Structrued Streaming业务数据实时分析_sql_53

 

我们先把mysqld的test数据库的webCount的表的内容清除

Structrued Streaming业务数据实时分析_apache_54

 

打开idea,我们编写两个程序

Structrued Streaming业务数据实时分析_apache_55

Structrued Streaming业务数据实时分析_sql_56

Structrued Streaming业务数据实时分析_spark_57

 

package com.spark.test

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.ProcessingTime

/**
* Created by Administrator on 2017/10/16.
*/
object StructuredStreamingKafka {

case class Weblog(datatime:String,
userid:String,
searchname:String,
retorder:String,
cliorder:String,
cliurl:String)

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
.master("local[2]")
.appName("streaming").getOrCreate()

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
.option("subscribe", "weblogs")
.load()

import spark.implicits._
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
val weblog = lines.map(_.split(","))
.map(x => Weblog(x(0), x(1), x(2),x(3),x(4),x(5)))
val titleCount = weblog
.groupBy("searchname").count().toDF("titleName","count")

val url ="jdbc:mysql://bigdata-pro01.kfk.com:3306/test"
val username="root"
val password="root"

val writer = new JDBCSink(url,username,password)
val query = titleCount.writeStream
.foreach(writer)
.outputMode("update")
//.format("console")
.trigger(ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
}

}

 

 

package com.spark.test

import java.sql._
import java.sql.{Connection, DriverManager}
import org.apache.spark.sql.{ForeachWriter, Row}

/**
* Created by Administrator on 2017/10/17.
*/
class JDBCSink(url:String, username:String,password:String) extends ForeachWriter[Row]{

var statement : Statement =_
var resultSet : ResultSet =_
var connection : Connection=_
override def open(partitionId: Long, version: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
// connection = new MySqlPool(url,username,password).getJdbcConn();
connection = DriverManager.getConnection(url,username,password);
statement = connection.createStatement()
return true
}

override def process(value: Row): Unit = {
val titleName = value.getAs[String]("titleName").replaceAll("[\\[\\]]","")
val count = value.getAs[Long]("count");

val querySql = "select 1 from webCount " +
"where titleName = '"+titleName+"'"

val updateSql = "update webCount set " +
"count = "+count+" where titleName = '"+titleName+"'"

val insertSql = "insert into webCount(titleName,count)" +
"values('"+titleName+"',"+count+")"

try{


var resultSet = statement.executeQuery(querySql)
if(resultSet.next()){
statement.executeUpdate(updateSql)
}else{
statement.execute(insertSql)
}
}catch {
case ex: SQLException => {
println("SQLException")
}
case ex: Exception => {
println("Exception")
}
case ex: RuntimeException => {
println("RuntimeException")
}
case ex: Throwable => {
println("Throwable")
}
}

}

override def close(errorOrNull: Throwable): Unit = {
// if(resultSet.wasNull()){
// resultSet.close()
// }
if(statement==null){
statement.close()
}
if(connection==null){
connection.close()
}
}

}

 

在pom.xml文件里添加这个依赖包

Structrued Streaming业务数据实时分析_apache_58

 

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>

 

 

我在这里说一下这个依赖包版本的选择上最好要跟你集群里面的依赖包版本一样,不然可能会报错的,可以参考hive里的Lib路径下的版本

 

 

 

 

 保持集群的dfs,hbase,yarn,zookeeper,都是启动的状态

Structrued Streaming业务数据实时分析_spark_59

Structrued Streaming业务数据实时分析_spark_60

 

 

Structrued Streaming业务数据实时分析_spark_61

 

 启动我们节点1和节点2的flume,在启动之前我们先修改一下flume的配置,因为我们把jdk版本和kafka版本后面更换了,所以我们要修改配置文件(3个节点的都改)

 Structrued Streaming业务数据实时分析_apache_62

 

 启动节点1的flume

Structrued Streaming业务数据实时分析_apache_63

 

 启动节点1的kafka

bin/kafka-server-start.sh config/server.properties

 Structrued Streaming业务数据实时分析_spark_64

 

启动节点2的flume

Structrued Streaming业务数据实时分析_apache_65

 

在节点2上把数据启动起来,实时产生数据

Structrued Streaming业务数据实时分析_apache_66

 

 回到idea我们把程序运行一下

 Structrued Streaming业务数据实时分析_apache_67

 

注意了,现在程序是没有报错的,因为我前期工作做得不是太好,给idea分配的内存小了,所以跑得很慢

 Structrued Streaming业务数据实时分析_sql_68

Structrued Streaming业务数据实时分析_apache_69

 

 

回到mysql里面查看webCount表,已经有数据进来了

Structrued Streaming业务数据实时分析_spark_70

 

 

Structrued Streaming业务数据实时分析_sql_71

 

 

 Structrued Streaming业务数据实时分析_sql_72

Structrued Streaming业务数据实时分析_apache_73

 

 

我们把配置文件修改如下

Structrued Streaming业务数据实时分析_apache_74

 

 Structrued Streaming业务数据实时分析_spark_75

[client]
socket=/var/lib/mysql/mysql.sock
default-character-set=utf8

[mysqld]
character-set-server=utf8
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

[mysql]
default-character-set=utf8

[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

 

 

 把表删除了

Structrued Streaming业务数据实时分析_spark_76

 

 

 重新创建表

Structrued Streaming业务数据实时分析_apache_77

create table webCount( titleName varchar(255) CHARACTER SET utf8 DEFAULT NULL, count int(11) DEFAULT NULL )ENGINE=lnnoDB DEFAULT CHARSET=utf8;

 

 

重新在运行一次程序

Structrued Streaming业务数据实时分析_sql_78

 

 Structrued Streaming业务数据实时分析_sql_79

可以看到没有中文乱码了。

 

同时我们通过可视化工具连接mysql查看

 Structrued Streaming业务数据实时分析_spark_80

 

举报

相关推荐

0 条评论