一、数据计算步骤汇总
下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到MySQL
二、实时维护粉丝关注数据
1、数据格式分析
我们的实时粉丝关注数据来源于服务端日志,因为当用户在直播平台中对主播进行关注和取消关注的时候会调用服务端接口,所以服务端会记录这些操作的日志。
具体的数据格式如下:
{"followeruid":"1001","followuid":"1002","timestamp":1798198304,"type":"user_follow","desc":"follow"}
followeruid:关注者uid,代表粉丝
followuid:被关注者uid,代表主播
timestamp:表示数据产生的时间
type:user_follow 表示是关注数据
desc:follow表示关注,还有一种是unfollow表示取消关注
这就是粉丝关注数据的基本格式
2、创建项目
使用SparkStreaming实时维护Neo4j中粉丝关注数据
创建model子项目 real_time_follow
在项目中创建scala目录,引入scala2.11版本的SDK
(1)添加依赖
在父项目db_video_recommend的pom.xml中添加spark相关依赖
<!-- spark相关依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- neo4j相关依赖 -->
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>neo4j-contrib</groupId>
<artifactId>neo4j-spark-connector</artifactId>
<version>2.4.5-M1</version>
</dependency>
以及添加一个repository,因为neo4j-spark-connector这个依赖在maven中央仓库中是没有的。
<repositories>
<!-- list of other repositories -->
<repository>
<id>SparkPackagesRepo</id>
<url>http://dl.bintray.com/spark-packages/maven</url>
</repository>
</repositories>
然后在real_time_follow的pom.xml中引入spark streaming相关的依赖即可,去掉依赖的版本号。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
注意:在这个项目中需要使用代码操作neo4j,就类似于操作mysql一样,所以需要找到neo4j的驱动jar包。
进到neo4j的官网
注意:其实我们直接到maven仓库中直接搜neo4j-java-driver也是可以的。
2、编写代码
在real_time_follow的scala目录下创建包com.imooc.spark
创建类:RealTimeFollowScala
代码如下
package com.imooc.spark
import com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase, Transaction, TransactionWork}
/**
* 任务2:
* 实时维护粉丝关注数据
*
*/
object RealTimeFollowScala {
def main(args: Array[String]): Unit = {
//创建StreamingContext
val conf = new SparkConf().setMaster("local[2]").setAppName("RealTimeFollowScala")
val ssc = new StreamingContext(conf, Seconds(5))
//指定Kafka的配置信息
val kafkaParams = Map[String,Object](
//kafka的broker地址信息
"bootstrap.servers"->"bigdata01:9092,bigdata02:9092,bigdata03:9092",
//key的序列化类型
"key.deserializer"->classOf[StringDeserializer],
//value的序列化类型
"value.deserializer"->classOf[StringDeserializer],
//消费者组id
"group.id"->"con_1",
//消费策略
"auto.offset.reset"->"latest",
//自动提交offset
"enable.auto.commit"->(true: java.lang.Boolean)
)
//指定要读取的topic的名称
val topics = Array("user_follow")
//获取消费kafka的数据流
val kafkaDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//处理数据
//首先将kafkaDStream转换为rdd,然后就可以调用rdd中的foreachPartition了
kafkaDStream.foreachRDD(rdd=>{
//一次处理一个分区的数据
rdd.foreachPartition(it=>{
//获取neo4j连接
val driver = GraphDatabase.driver("bolt://bigdata04:7687", AuthTokens.basic("neo4j", "admin"))
//开启一个会话
val session = driver.session()
it.foreach(record=>{
//获取粉丝关注相关数据
//{"followeruid":"1001","followuid":"1002","timestamp":1798198304,"type":"user_follow","desc":"follow"}
val line = record.value()
//解析数据
val userFollowObj = JSON.parseObject(line)
//获取数据类型,关注 or 取消关注
val followType = userFollowObj.getString("desc")
//获取followeruid
val followeruid = userFollowObj.getString("followeruid")
//获取followuid
val followuid = userFollowObj.getString("followuid")
if("follow".equals(followType)){
//添加关注:因为涉及多条命令,所以需要使用事务
session.writeTransaction(new TransactionWork[Unit] (){
override def execute(tx: Transaction): Unit = {
try{
tx.run("merge (:User {uid:'"+followeruid+"'})")
tx.run("merge (:User {uid:'"+followuid+"'})")
tx.run("match(a:User {uid:'"+followeruid+"'}),(b:User {uid:'"+followuid+"'}) merge (a) -[:follow]-> (b)")
//提交事务
tx.commit()
}catch {
case ex: Exception => tx.rollback()
}
}
})
}else{
//取消关注
session.run("match(:User {uid: '"+followeruid+"'}) -[r:follow]-> (:User {uid: '"+followuid+"'}) delete r")
}
})
//关闭会话
session.close()
//关闭连接
driver.close()
})
})
//启动任务
ssc.start()
//等待任务停止
ssc.awaitTermination()
}
}
3、环境准备
确保kafka、filebeat采集程序、flume数据分发程序,以及服务端接口服务正常运行。
然后在本地idea中运行sparkstreaming程序。
(1)启动zookeeper
zkServer.sh start
(2)启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
(3)启动hadoop
start-all.sh
(4)启动采集数据服务
1)data_collect
启动jar包
nohup java -jar data_collect-1.0-SNAPSHOT.jar &
进行测试
curl -XGET 'http://localhost:8080/v1/t1?name=test'
2)server_inter
启动jar包
nohup java -jar server_inter-1.0-SNAPSHOT.jar &
进行测试
curl -XGET 'http://localhost:8081/s1/t1?name=test'
(5)启动flume
按照如下顺序启动
nohup bin/flume-ng agent --conf conf-user-active/ --conf-file conf-user-active/kafka-hdfs-user-active.conf --name agent &
nohup bin/flume-ng agent --conf conf-video-info/ --conf-file conf-video-info/kafka-hdfs-video-info.conf --name agent &
nohup bin/flume-ng agent --conf conf-kafka-kafka/ --conf-file conf-kafka-kafka/kafka-to-kafka.conf --name agent &
进行验证
jps -ml
(6)启动filebeat
./filebeat -c filebeat.yml
(7)连接到kafka
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic all_type_data_r2p40 --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic default_r2p5 --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user_active --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user_follow --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic video_info --from-beginning
4、执行代码
再执行代码:GenerateRealTimeFollowData模拟生成粉丝关注数据。
[INFO] 接口调用成功:{"followeruid":"2004","followuid":"2008","timestamp":1598771070069,"type":"user_follow","desc":"follow"}
此时到neo4j中查看结果
说明这个程序是OK的,这样就可以实时维护粉丝的关注数据了。
但是注意:目前这个程序其实是存在一些问题的,因为数据通过filebeat采集、再到kafka,最终我们消费的数据顺序和数据产生的顺序可能就不一致了。
举个例子:
用户A先关注了用户B
用户A很快又取关了用户B
这样就会产生两条日志数据,这两条数据经过采集分发之后,最后我们消费过来的数据顺序很有可能是这样的
用户A取关用户B
用户A关注了用户B
这种最终的结果就是用户A关注了用户B,那这样就不准确了,虽然这种情况是小概率事件,但是也是存在的,在SparkStreaming中如何解决呢?
由于SparkStreaming是一小批一小批处理的,所以我们可以针对每次获取的这一小批数据根据数据产生的时间戳进行排序,从小到大,然后按照这个顺序去操作这些数据,这样其实就能在很大程度上避免我们刚才分析的这种问题了,但是这样并没有完全解决掉这个问题,如果两条数据分到了两批数据里面,还是会存在这个问题的,不过这种情况出现的概率就很低了,我们暂时就忽略不计了。
在这我把这个思路分析好了,给大家留一个作业,大家下去之后自己尝试动手实现一下这个功能。
5、完善代码支持本地和集群
最后我们来对这个代码再完善一下,让它同时支持在本地运行和集群运行。
完善后的代码如下:
package com.imooc.spark
import com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase, Transaction, TransactionWork}
/**
* 任务2:
* 实时维护粉丝关注数据
*
*/
object RealTimeFollowScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local[2]"
var appName = "RealTimeFollowScala"
var seconds = 5
var kafkaBrokers = "bigdata01:9092,bigdata02:9092,bigdata03:9092"
var groupId = "con_1"
var topic = "user_follow"
var boltUrl = "bolt://bigdata04:7687"
var username = "neo4j"
var password = "admin"
if(args.length > 0){
masterUrl = args(0)
appName = args(1)
seconds = args(2).toInt
kafkaBrokers = args(3)
groupId = args(4)
topic = args(5)
boltUrl = args(6)
username = args(7)
password = args(8)
}
//创建StreamingContext
val conf = new SparkConf().setMaster(masterUrl).setAppName(appName)
val ssc = new StreamingContext(conf, Seconds(seconds))
//指定Kafka的配置信息
val kafkaParams = Map[String,Object](
//kafka的broker地址信息
"bootstrap.servers"->kafkaBrokers,
//key的序列化类型
"key.deserializer"->classOf[StringDeserializer],
//value的序列化类型
"value.deserializer"->classOf[StringDeserializer],
//消费者组id
"group.id"->groupId,
//消费策略
"auto.offset.reset"->"latest",
//自动提交offset
"enable.auto.commit"->(true: java.lang.Boolean)
)
//指定要读取的topic的名称
val topics = Array(topic)
//获取消费kafka的数据流
val kafkaDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//处理数据
//首先将kafkaDStream转换为rdd,然后就可以调用rdd中的foreachPartition了
kafkaDStream.foreachRDD(rdd=>{
//一次处理一个分区的数据
rdd.foreachPartition(it=>{
//获取neo4j连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
it.foreach(record=>{
//获取粉丝关注相关数据
//{"followeruid":"1001","followuid":"1002","timestamp":1798198304,"type":"user_follow","desc":"follow"}
val line = record.value()
//解析数据
val userFollowObj = JSON.parseObject(line)
//获取数据类型,关注 or 取消关注
val followType = userFollowObj.getString("desc")
//获取followeruid
val followeruid = userFollowObj.getString("followeruid")
//获取followuid
val followuid = userFollowObj.getString("followuid")
if("follow".equals(followType)){
//添加关注:因为涉及多条命令,所以需要使用事务
session.writeTransaction(new TransactionWork[Unit](){
override def execute(tx: Transaction): Unit = {
tx.run("merge (:User {uid: '"+followeruid+"'})")
tx.run("merge (:User {uid: '"+followuid+"'})")
tx.run("match(a:User {uid: '"+followeruid+"'}),(b:User {uid: '"+followuid+"'}) merge (a)-[:follow]->(b)")
tx.commit()
}
})
}else{
//取消关注
session.run("match(:User {uid: '"+followeruid+"'}) -[r:follow]-> (:User {uid: '"+followuid+"'}) delete r")
/*session.writeTransaction(new TransactionWork[Unit](){
override def execute(tx: Transaction): Unit = {
tx.run("match(:User {uid: '"+followeruid+"'}) -[r:follow]-> (:User {uid: '"+followuid+"'}) delete r")
tx.commit()
}
})*/
}
})
//关闭会话
session.close()
//关闭连接
driver.close()
})
})
//启动任务
ssc.start()
//等待任务停止
ssc.awaitTermination()
}
}
6、编译打包
(1)首先添加打包配置
注意:建议这个项目中的所有依赖包全部在spark-submit脚本后面的–jars中指定,这样最终生成的任务jar就比较小了,提交任务的时候速度会比较快。所以这里面的jar-with-dependencies插件就可以不使用了,因为我们打jar包的时候不需要把依赖打进去,这个时候也不需要在依赖中添加provided参数了。
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打包插件 -->
<!--<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>-->
</plugins>
</build>
(2)进行打包
D:\IdeaProjects\db_video_recommend>cd real_time_follow
D:\IdeaProjects\db_video_recommend\real_time_follow>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ real_time_follow ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend\real_time_follow\target\real_time_follow-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.465s
[INFO] Final Memory: 23M/619M
[INFO] ------------------------------------------------------------------------
7、编写脚本方便提交任务
为了方便的提交任务,我们再开发一个任务提交脚本,在项目中创建一个bin目录,把脚本放到这个bin目录一样,这样便于管理维护
startRealTimeFollow.sh
#!/bin/bash
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
appName="RealTimeFollowScala"
seconds=5
kafkaBrokers="bigdata01:9092,bigdata02:9092,bigdata03:9092"
groupId="con_1"
topic="user_follow"
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"
yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.RealTimeFollowScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/spark-streaming-kafka-0-10_2.11-2.4.3.jar,${yarnCommonLib}/kafka-clients-2.4.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar \
/data/soft/video_recommend/jobs/real_time_follow-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${seconds} ${kafkaBrokers} ${groupId} ${topic} ${boltUrl} ${username} ${password}
注意:
针对这个参数:yarnCommonLib=“hdfs://bigdata01:9000/yarnCommonLib”
使用公共的依赖包目录,使用起来方便,管理维护起来也方便,并且还可以提高任务执行效率,因为当我们向集群提交的时候,任务需要的依赖jar包是会自动上传到hdfs的一个临时目录的,如果我们提前把jar包上传到hdfs上面,就不会再重新上传了
针对–jars后面指定的依赖jar包,需要额外再指定kafka-clients-2.4.1.jar和reactive-streams-1.0.3.jar,一个是kafka的,一个是neo4j需要依赖的,否则提交任务到集群执行是会报错的。
公共yarnCommonLib下载地址:
链接:https://pan.baidu.com/s/12mSvRShhlBlRa4KerjSPYQ?pwd=5jcj
提取码:5jcj
8、上传依赖的jar包
接下来把项目需要依赖的jar包先上传到bigdata04上,然后再上传到hdfs上面
[root@bigdata04 video_recommend]# mkdir yarnCommonLib
[root@bigdata04 video_recommend]# cd yarnCommonLib
[root@bigdata04 yarnCommonLib]# ll
total 8520
-rw-r--r--. 1 root root 670583 May 25 2020 fastjson-1.2.68.jar
-rw-r--r--. 1 root root 3269712 Jun 9 2020 kafka-clients-2.4.1.jar
-rw-r--r--. 1 root root 4550360 Aug 30 2020 neo4j-java-driver-4.1.1.jar
-rw-r--r--. 1 root root 11369 Aug 30 2020 reactive-streams-1.0.3.jar
-rw-r--r--. 1 root root 216502 Aug 12 2020 spark-streaming-kafka-0-10_2.11-2.4.3.jar
[root@bigdata04 yarnCommonLib]# hdfs dfs -put *.jar /yarnCommonLib
[root@bigdata04 yarnCommonLib]# hdfs dfs -ls /yarnCommonLib
Found 5 items
-rw-r--r-- 2 root supergroup 670583 2026-02-14 16:28 /yarnCommonLib/fastjson-1.2.68.jar
-rw-r--r-- 2 root supergroup 3269712 2026-02-14 16:38 /yarnCommonLib/kafka-clients-2.4.1.jar
-rw-r--r-- 2 root supergroup 4550360 2026-02-14 16:28 /yarnCommonLib/neo4j-java-driver-4.1.1.jar
-rw-r--r-- 2 root supergroup 11369 2026-02-14 16:50 /yarnCommonLib/reactive-streams-1.0.3.jar
-rw-r--r-- 2 root supergroup 216502 2026-02-14 16:28 /yarnCommonLib/spark-streaming-kafka-0-10_2.11-2.4.3.jar
9、上传项目jar包和脚本
然后再把项目生成的jar包和提交脚本都上传到bigdata04上面
[root@bigdata04 video_recommend]# mkdir jobs
[root@bigdata04 video_recommend]# cd jobs
[root@bigdata04 jobs]# ll
total 16
-rw-r--r--. 1 root root 10468 Aug 30 2020 real_time_follow-1.0-SNAPSHOT.jar
-rw-r--r--. 1 root root 1043 Feb 14 16:51 startRealTimeFollow.sh
10、提交任务
把任务提交到集群执行。
[root@bigdata04 jobs]# sh -x startRealTimeFollow.sh
重新再执行GenerateRealTimeFollowData生成一条粉丝关注数据,最后到neo4j中验证结果,结果ok就说明任务在集群中是正常执行的。
11、停止任务
停止sparkStreaming任务
[root@bigdata04 jobs]# yarn application -kill application_1771056714339_0007
这是正常的一帆风顺的流程。
12、遇到问题思路
那我们如果是第一次这样做,肯定会遇到各种各样的问题,在这里来给大家复现一下。
按照正常的思路,这个项目依赖的jar包最开始我们肯定只会使用这三个
spark-streaming-kafka-0-10_2.11
fastjson
neo4j-java-driver
修改一下startRealTimeFollow.sh 脚本
#!/bin/bash
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
appName="RealTimeFollowScala"
seconds=5
kafkaBrokers="bigdata01:9092,bigdata02:9092,bigdata03:9092"
groupId="con_1"
topic="user_follow"
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"
yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.RealTimeFollowScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/spark-streaming-kafka-0-10_2.11-2.4.3.jar \
/data/soft/video_recommend/jobs/real_time_follow-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${seconds} ${kafkaBrokers} ${groupId} ${topic} ${boltUrl} ${username} ${password}
执行这个脚本
[root@bigdata04 jobs]# sh -x startRealTimeFollow.sh
此时你会发现任务执行失败了
报错信息如下,在控制台就可以看到具体的报错信息:
这个报错信息表示是缺少kafka的一些依赖,org/apache/kafka/common/serialization/StringDeserializer,通过这里面的包名可以看出来,这个是kafka-clients的依赖,如果直接看不出来,那就到网上搜一下这个报错信息看看有没有收获java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
2026-02-14 17:16:39,311 ERROR yarn.Client: Application diagnostics message: User class threw exception: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
所以我们在脚本中再添加这个kafka-client的依赖
#!/bin/bash
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
appName="RealTimeFollowScala"
seconds=5
kafkaBrokers="bigdata01:9092,bigdata02:9092,bigdata03:9092"
groupId="con_1"
topic="user_follow"
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"
yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.RealTimeFollowScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/spark-streaming-kafka-0-10_2.11-2.4.3.jar,${yarnCommonLib}/kafka-clients-2.4.1.jar \
/data/soft/video_recommend/jobs/real_time_follow-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${seconds} ${kafkaBrokers} ${groupId} ${topic} ${boltUrl} ${username} ${password}
再执行这个脚本
[root@bigdata04 jobs]# sh -x startRealTimeFollow.sh
发现任务还是执行失败,在控制台可以看到报错信息如下
java.lang.NoClassDefFoundError: Could not initialize class org.neo4j.driver.Config
这里提示neo4j中的 org.neo4j.driver.Config初始化失败,但是neo4j的依赖我们也添加进去了,为什么还会报这个错呢?
如果这里看不到有用的错误信息,我们可以尝试到YARN中查看
在YARN的8088界面中我们进入到spark的任务界面
注意:需要确保hadoop的日志聚合功能开启,以及Spark的historyServer进程也是开启的。
其实根源是在这,主要是缺少这个类org.reactivestreams.Publisher,最终导致的org.neo4j.driver.Config无法初始化。
那这个类org.reactivestreams.Publisher是从哪来的呢?
它是neo4j需要使用的一个依赖里面的类
到哪找呢?
查看neo4j-java-driver这个依赖的依赖,会发现它里面确实有一个依赖的名称是org.reactivestreams,所以说我们还需要把这个依赖的jar包找到引进去。
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
<scope>compile</scope>
</dependency>
到此为止,把这个jar包再加进去就可以正常执行了,这就是我们排查问题的一个思路和流程,这个思路大家一定要学以致用。
这个任务开发到这就结束了。