一、数据计算步骤汇总
下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到MySQL
二、每周一计算最近一月主播视频评级
1、数据分析
视频数据来源于服务端,当主播开播结束后会产生一条视频数据
数据格式:
{"id":"1769913943534","uid":"1000","nickname":"jack94","gold":284,"watchnumpv":284,"watchnumuv":284,"hosts":284,"nofollower":284,"looktime":284,"smlook":284,"follower":284,"gifter":284,"length":384, "area":"A_US","rating":"A","exp":284,"timestamp":1769913940000,"type":"video_info"}
2、生成数据
之前我们通过埋点模拟上报数据,通过flume落盘到hdfs上面,这样在hdfs上面产生的目录会使用当天日期,为了保证我这里使用的目录和大家都保持一致,所以在这我就生成一个固定的日期目录
使用代码GenerateVideoInfoDataV2,在代码中指定日期 2026-02-01,这样会把模拟生成的用户活跃数据直接上传到hdfs上面,因为之前的数据采集流程我们已经详细分析过了,所以在这就直接把数据上传到hdfs上面了。
执行代码:GenerateVideoInfoDataV2,将会把数据上传到hdfs的这个目录下
hdfs://bigdata01:9000/data/video_info/20260201/
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /data/video_info/20260201
Found 1 items
-rw-r--r-- 3 yehua supergroup 2699 2026-02-14 21:32 /data/video_info/20260201/video_info-2026-02-01.log
这个任务需要做的就是统计最近一个月内主播的视频评级信息
在这我们先初始化一天的数据即可,计算一天和计算一个月的数据,计算逻辑是一样的,只有spark任务的输入路径不一样
如果是一个月的数据,假设这一个月有30天,则需要把这30天对应的30个目录使用逗号分隔,拼接成一个字符串,作为Spark任务的输入即可。
为什么这个任务要每周计算一次,而不是每天计算一次呢?
因为很多主播不会每天都开播,所以我们每天都计算意义不大,均衡考虑之后按照每周计算一次这个频率。
3、创建项目
创建子module项目:update_video_info
创建scala目录,引入scala2.11版本的sdk
在scala目录中创建包:com.imooc.spark
(1)引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
(2) 创建代码
创建类:UpdateVideoInfoScala
代码如下:
package com.imooc.spark
import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.slf4j.LoggerFactory
/**
* 任务5:
* 每周一计算最近一个月主播视频评级
* 把最近几次视频评级在3B+或2A+的主播,在neo4j中设置flag=1
*
* 注意:在执行程序之前需要先把flag=1的重置为0
*
*/
object UpdateVideoInfoScala {
val logger = LoggerFactory.getLogger("UpdateVideoInfo")
def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "UpdateVideoInfo"
var filePath = "hdfs://bigdata01:9000/data/video_info/20260201"
var boltUrl = "bolt://bigdata04:7687"
var username = "neo4j"
var password = "admin"
if(args.length > 0){
masterUrl = args(0)
appName = args(1)
filePath = args(2)
boltUrl = args(3)
username = args(4)
password = args(5)
}
//在Driver端执行此代码,将flag=1的重置为0
//获取neo4j连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
session.run("match(a:User) where a.flag=1 set a.flag=0")
//关闭会话
session.close()
//关闭连接
driver.close()
//获取SparkContext
val conf = new SparkConf()
.setAppName(appName)
.setMaster(masterUrl)
val sc = new SparkContext(conf)
//读取视频评级数据
val linesRDD = sc.textFile(filePath)
//解析数据中的uid,rating,timestamp
val tup3RDD = linesRDD.map(line => {
try {
val jsonObj = JSON.parseObject(line)
val uid = jsonObj.getString("uid")
val rating = jsonObj.getString("rating")
val timestamp: Long = jsonObj.getLong("timestamp")
(uid, rating, timestamp)
} catch {
case ex: Exception => logger.error("json数据解析失败:" + line)
("0", "0", 0L)
}
})
//过滤掉异常数据
val filterRDD = tup3RDD.filter(_._2 != "0")
//获取用户最近3场直播的评级信息
val top3RDD = filterRDD.groupBy(_._1).map(group=>{
val top3 = group._2.toList.sortBy(_._3).reverse.take(3).mkString("\t")
(group._1,top3)
})
//过滤出来满足3场B+的数据
val top3BRDD = top3RDD.filter(tup => {
var flag = false
val fields = tup._2.split("\t")
if (fields.length == 3) {
//3场B+,表示里面没有出现C和D
val tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1) + "," + fields(2).split(",")(1)
if (!tmp_str.contains("C") && !tmp_str.contains("D")) {
flag = true
}
}
flag
})
//把满足3场B+的数据更新到neo4j中,增加一个字段flag,flag=1表示是视频评级满足条件的主播,允许推荐给用户
//注意:针对3场B+的数据还需要额外再限制一下主播等级,主播等级需要>=15,这样可以保证筛选出来的主播尽可能是一些优质主播
top3BRDD.foreachPartition(it=>{
//获取neo4j连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
it.foreach(tup=>{
session.run("match(a:User {uid: '"+tup._1+"'}) where a.level >=15 set a.flag = 1")
})
//关闭会话
session.close()
//关闭连接
driver.close()
})
//过滤出来满足2场A+的数据
val top2ARDD = top3RDD.filter(tup=>{
var flag = false
val fields = tup._2.split("\t")
if (fields.length >= 2) {
//2场A+,获取最近两场直播评级,里面不能出现B、C、D
val tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1)
if (!tmp_str.contains("B") && !tmp_str.contains("C") && !tmp_str.contains("D")) {
flag = true
}
}
flag
})
//把满足2场A+的数据更新到neo4j中,增加一个字段flag,flag=1表示是视频评级满足条件的主播,允许推荐给用户
//注意:针对2场A+的数据还需要额外再限制一下主播等级,主播等级需要>=4,这样可以保证筛选出来的主播尽可能是一些优质主播
top2ARDD.foreachPartition(it=>{
//获取neo4j连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
it.foreach(tup=>{
session.run("match(a:User {uid: '"+tup._1+"'}) where a.level >=4 set a.flag = 1")
})
//关闭会话
session.close()
//关闭连接
driver.close()
})
}
}
4、本地执行
在本地执行代码
然后到neo4j的web界面查看结果,发现只有uid为1005的数据对应的flag不等于1(没有flag属性)
这样是正确的。
5、开发提交任务脚本
下面开发任务执行脚本
注意:这个脚本中需要实现获取最近一个月的数据目录
startUpdateVideoInfo.sh
#!/bin/bash
# 获取最近一个月的文件目录
#filePath=""
#for((i=1;i<=30;i++))
#do
# filePath+="hdfs://bigdata01:9000/data/video_info/"`date -d "$i days ago" +"%Y%m%d"`,
#done
#默认获取昨天时间
dt=`date -d "1 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi
#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/video_info/${dt}"
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
# 组装一个唯一的名称
appName="UpdateVideoInfoScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"
yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.UpdateVideoInfoScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar \
/data/soft/video_recommend/jobs/update_video_info-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${boltUrl} ${username} ${password}
#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
6、配置打包
对项目代码编译打包,在pom.xml中添加打包配置
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
7、打包
打jar包
D:\IdeaProjects\db_video_recommend\update_video_info>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ update_video_info ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend\update_video_info\target\update_video_info-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.793s
[INFO] Final Memory: 23M/619M
[INFO] ------------------------------------------------------------------------
8、上传jar包和脚本
将jar包和任务执行脚本上传到bigdata04机器上面
[root@bigdata04 jobs]# ll
-rw-r--r--. 1 root root 1461 Aug 31 2020 startUpdateVideoInfo.sh
-rw-r--r--. 1 root root 17242 Aug 31 2020 update_video_info-1.0-SNAPSHOT.jar
9、提交任务、验证
向集群中提交任务
[root@bigdata04 jobs]# sh -x startUpdateVideoInfo.sh 20260201
到集群中验证任务执行状态,发现任务执行成功,此时neo4j中的数据还是老样子,因为刚才我们已经在本地执行过一次了,重复再执行对结果没影响。