一、数据计算步骤汇总
下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到MySQL
二、每周一计算最近一周内主播主播的三度关系列表
1、分析
前面我们在neo4j中维护了粉丝和主播的一些信息,在这里我们就需要基于neo4j中的数据统计主播的三度关系推荐列表了
这个任务在这也是每周计算一次,我们测试过,每天都计算的话,最终的结果变化是不大的,所以就没必要每天计算了。
2、创建项目
创建子module项目:get_recommend_list
创建scala目录,引入scala2.11版本的sdk
在scala目录中创建包:com.imooc.spark
(1)引入依赖
引入依赖,这里面需要额外用到spark-sql和neo4j-spark-connector这两个依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>neo4j-contrib</groupId>
<artifactId>neo4j-spark-connector</artifactId>
</dependency>
注意:在使用spark读取neo4j中数据的时候,可以使用一个插件,在官网中可以找到
咱们前面使用的neo4j-java-driver相当于是使用原生代码操作neo4j,而现在使用neo4j-spark-connector相当于把neo4j封装到spark中了,使用起来比较方便。
在使用neo4j-spark-connector的时候,选择哪个版本呢?
点这里进去看一下
这个版本是基于neo4j 4.0,我们现在使用的neo4j是3.5的,这种一般是向下兼容的,所以操作neo4j 3.5也是可以的,后面写的spark是2.4.5,这个也是可以的,我们使用的spark是2.4.3的,最后一位版本号不一致没问题。
目前最新版本是基于scala2.12版本编译的,我们在spark项目中使用的scala版本是2.11,所以使用2.4.5-M1这个版本。
注意:在使用这个依赖的时候,还需要配置它对应的repository,因为这个依赖没有在maven仓库中,把这些配置添加到父项目的pom.xml文件中
注意!!这里引入依赖会出现一点问题,改为如下:
因为:
目前neo4j-spark-connector对应的源仓库有问题,禁止访问,所以导致这个依赖无法正常下载。
其实在工作中也会遇到类似这样的问题。
所以这里依赖为:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>neo4j-contrib</groupId>
<artifactId>neo4j-spark-connector</artifactId>
<version>2.4.5-M1</version>
<scope>system</scope>
<systemPath>D:/neo4j-spark-connector-2.4.5-M1.jar</systemPath>
</dependency>
</dependencies>
neo4j-spark-connector-2.4.5-M1.jar下载地址:
链接:https://pan.baidu.com/s/1fcv-bamdXOoUtqRogU7-3w?pwd=emzw
提取码:emzw
(2)开发代码
接下来我们来开发代码
创建类:GetRecommendListScala
代码如下:
package com.imooc.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.neo4j.spark.Neo4j
import scala.collection.mutable.ArrayBuffer
/**
* 任务6:
* 每周一计算最近一周内主活主播的三度关系列表
* 注意:
* 1:待推荐主播最近一周内活跃过
* 2:待推荐主播等级>4
* 3:待推荐主播最近1个月视频评级满足3B+或2A+ (flag=1)
* 4:待推荐主播的粉丝列表关注重合度>2
* Created by xuwei
*/
object GetRecommendListScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local[2]"
var appName = "GetRecommendListScala"
var boltUrl = "bolt://bigdata04:7687"
var username = "neo4j"
var password = "admin"
var timestamp = 0L //过滤最近一周内是否活跃过
var duplicateNum = 2 //粉丝列表关注重合度
var level = 4 //主播等级
var outputPath = "hdfs://bigdata01:9000/data/recommend_data/20260201"
if(args.length > 0){
masterUrl = args(0)
appName = args(1)
boltUrl = args(2)
username = args(3)
password = args(4)
timestamp = args(5).toLong
duplicateNum = args(6).toInt
level = args(7).toInt
outputPath = args(8)
}
//获取SparkContext
val conf = new SparkConf()
.setAppName(appName)
.setMaster(masterUrl)
.set("spark.driver.allowMultipleContexts","true")
.set("spark.neo4j.url", boltUrl)
.set("spark.neo4j.user",username)
.set("spark.neo4j.password",password)
val sc = new SparkContext(conf)
//获取一周内主活的主播 并且主播等级大于4的数据
var params = Map[String,Long]()
params += ("timestamp"->timestamp)
params += ("level"->level)
val neo4j: Neo4j = Neo4j(sc).cypher("match (a:User ) where a.timestamp >= {timestamp} and a.level >= {level} return a.uid").params(params)
//将从neo4j中查询出来的数据转换为rowRDD
val rowRDD = neo4j.loadRowRdd
//一次处理一批
//过滤出粉丝关注重合度>2的数据,并且对关注重合度倒序排列
//最终的数据格式是:主播id,待推荐的主播id
val mapRDD = rowRDD.mapPartitions(it=>{
//获取neo4j连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
//保存计算出来的结果
val resultArr = ArrayBuffer[String]()
it.foreach(row=>{
val uid = row.getString(0)
// 计算某一个用户的三度关系(主播的二度关系)
// 注意:数据量大了之后,这个计算操作是非常耗时的。
//val result = session.run("match (a:User {uid:\""+uid+"\"}) <-[:follow]- (b:User) -[:follow]-> (c:User) return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")
//对b、c的主活时间进行过滤,以及对c的level和flag值进行过滤
val result = session.run("match (a:User {uid:\""+uid+"\"}) <-[:follow]- (b:User) -[:follow]-> (c:User) where b.timestamp >= "+timestamp+" and c.timestamp >= "+timestamp+" and c.level >="+level+" and c.flag = 1 return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")
while(result.hasNext){
val record = result.next()
val sum = record.get("sum").asInt()
if(sum>duplicateNum){
resultArr+=record.get("auid").asString()+"\t"+record.get("cuid").asString()
}
}
})
//关闭会话
session.close()
//关闭连接
driver.close()
resultArr.iterator
})
//把数据转成tuple2的形式
val tup2RDD = mapRDD.map(line => {
val splits = line.split("\t")
(splits(0),splits(1))
})
//根据主播id进行分组,可以获取到这个主播的待推荐列表
val reduceRDD = tup2RDD.reduceByKey((v1,v2)=>{
v1+","+v2
})
//最终把结果组装成这种形式
//10001 1002,1003,1004
reduceRDD.map(tup=>{
tup._1+"\t"+tup._2
}).repartition(1).saveAsTextFile(outputPath)
}
}
(3)先不进行条件过滤
先使用这一行代码,在计算三度关系数据的时候暂时先不进行条件过滤。
val result = session.run("match (a:User {uid:\""+uid+"\"}) <-[:follow]- (b:User) -[:follow]-> (c:User) return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")
执行代码,验证结果如下:
[root@bigdata04 neo4j-community-3.5.21]# hdfs dfs -cat /data/recommend_data/20260201/part-00000
1005 1000
1000 1005,1004
1004 1000
到neo4j中验证一下,确实是正确的。
(4)进行条件过滤
接着使用这一行代码,在计算三度关系数据的对数据进行条件过滤
val result = session.run("match (a:User {uid:\""+uid+"\"}) <-[:follow]- (b:User) -[:follow]-> (c:User) where b.timestamp >= "+timestamp+" and c.timestamp >= "+timestamp+" and c.level >="+level+" and c.flag = 1 return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")
执行代码,验证结果如下:
[root@bigdata04 neo4j-community-3.5.21]# hdfs dfs -cat /data/recommend_data/20260201/part-00000
1005 1000
1000 1004
1004 1000
到neo4j中验证一下,确实是正确的,因为1005的flag不为1,被过滤掉了,所以我在关注1000这个主播的时候平台只需要给我推荐主1004这个主播即可。
(5)重新设置rowRDD的分区
注意:这个代码在执行mapPartitions的时候,最好把rowRDD的分区重新设置一下,如果让程序自动分配的话可能不太合理,分多了分少了都不太好
由于我们在mapPartitions中需要操作neo4j,所以这个时候rowRDD分区的数量就可以等于(neo4j服务器的CPU数量-1),要给neo4j预留出来一个cpu来处理其它任务请求。
我们当时的服务器是8个CPU,给neo4j预留出来一个,剩下还有7个,所以说,neo4j此时可以对外提供的最大并发处理能力是7,那我们就把rowRDD设置为7个分区,就会有7个线程并行处理数据,它们可以并行操作neo4j,这样效率最高。
如果给rowRDD设置的分区太多,对应的就会有多个线程并行操作neo4j,会给neo4j造成很大的压力,相当于neo4j在满负荷的运行,这个时候我们另外一个实时维护neo4j中粉丝关注数据的程序执行起来就很慢了,以及其他人如果这个时候想查询neo4j,也会非常慢,所以这样就不太好了。
如果给rowRDD设置的分区太少,对应产生的执行线程就比较少,此时neo4j会比较空闲,没有多大压力,但是我们这个三度关系的任务执行就非常慢了。
综上所述,建议把rowRDD的分区数量设置为7,这样可以充分利用neo4j服务器的性能,也不至于把neo4j服务器拖垮。
//将从neo4j中查询出来的数据转换为rowRDD
//val rowRDD = neo4j.loadRowRdd
//repartition 这里的repartition是为了把数据分为7份,这样下面的mapPartitions在执行的时候就有7个线程,这7个线程并行查询neo4j数据库
val rowRDD = neo4j.loadRowRdd.repartition(7)
(6)优化
还有一点可以优化的,增加RDD持久化,把RDD数据缓存起来,这样可以避免个别task失败导致的数据重算,因为这个计算还是比较消耗时间的,所以说尽可能保证计算出来的数据不丢失。
val mapRDD = rowRDD.mapPartitions(it=>{
…
…
}).persist(StorageLevel.MEMORY_AND_DISK)//把RDD数据缓存起来
(7)思考
问题:大家有没有想过,我们是否可以直接在Neo4j(sc).cypher(…)中指定一条查询语句,直接把所有的三度关系全部查询出来?
这样理论上是可以的,但是在实际中,当neo4j中存储的节点数和关系数量达到千万级别之后,同时查询所有满足条件主播的三度关系推荐列表的时候会很慢,有时候会导致等了几十分钟也查询不出来数据,所以在这我们就把这个功能进行了拆解,先查看满足条件的主播uid列表,然后再一个一个计算这些主播的三度关系推荐列表,这样可以提高计算效率,并且不会出现查询不出来结果的情况。
下面我们将任务提交到集群上面去执行。
3、打包配置
先添加编译打包配置
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
4、进行打包
打jar包
D:\IdeaProjects\db_video_recommend\get_recommend_list>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ get_recommend_list ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend\get_recommend_list\target\get_recommend_list-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.455s
[INFO] Final Memory: 22M/475M
[INFO] ------------------------------------------------------------------------
5、开发脚本
开发任务提交脚本
startGetRecommendList.sh
#!/bin/bash
#默认获取上周一的时间
dt=`date -d "7 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=`date -d "7 days ago $1" +"%Y%m%d"`
fi
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
# 组装一个唯一的名称
appName="GetRecommendListScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"
#获取上周一的时间戳(单位:毫秒)
timestamp=`date --date="${dt}" +%s`000
#粉丝列表关注重合度
duplicateNum=2
#主播等级
level=4
#输出结果数据路径
outputPath="hdfs://bigdata01:9000/data/recommend_data/${dt}"
yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.GetRecommendListScala \
--jars ${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar,${yarnCommonLib}/neo4j-spark-connector-2.4.5-M1.jar \
/data/soft/video_recommend/jobs/get_recommend_list-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${boltUrl} ${username} ${password} ${timestamp} ${duplicateNum} ${level} ${outputPath}
#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
6、上传jar包和脚本
将项目jar包和任务执行脚本上传到bigdata04机器上
[root@bigdata04 jobs]# ll
-rw-r--r--. 1 root root 11857 Aug 31 2020 get_recommend_list-1.0-SNAPSHOT.jar
-rw-r--r--. 1 root root 1508 Aug 31 2020 startGetRecommendList.sh
7、提交任务、验证
向集群中提交任务
[root@bigdata04 jobs]# sh -x startGetRecommendList.sh 20260201
到hdfs中验证结果,看到这个结果,说明任务在集群中正常执行了。
[root@bigdata04 neo4j-community-3.5.21]# hdfs dfs -cat /data/recommend_data/20260125/*
1005 1000
1000 1004
1004 1000