一、数据计算步骤汇总
下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到MySQL
二、每天定时更新主播等级
1、数据分析
主播等级数据来源于服务端数据库(定时增量导入到HDFS中)
数据格式:6 1000 380 4 2016-07-05 17:48:22 2026-02-01 11:43:18 0 18
注意:表中有两个等级字段,一个是用户等级,一个是主播等级
在这我们需要使用主播等级
针对这份数据,最核心的两个字段是第2列和第4列
第2列是用户uid
第4列是主播等级anchor_level
这个任务需要做的是把每天主播等级发生了变化的数据更新到neo4j中,在neo4j中也维护一份主播的等级
2、创建项目
创建一个子module:update_user_level
创建scala目录,添加scala2.11的sdk
(1)引入依赖
引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
在update_user_level下面的scala里面创建包:com.imooc.spark
创建类:UpdateUserLevelScala
代码如下
package com.imooc.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
/**
* 任务3:
* 每天定时更新主播等级
*
*/
object UpdateUserLevelScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "RealTimeFollowScala"
var filePath = "hdfs://bigdata01:9000/data/cl_level_user/20260201"
var boltUrl = "bolt://bigdata04:7687"
var username = "neo4j"
var password = "admin"
if(args.length > 0){
masterUrl = args(0)
appName = args(1)
filePath = args(2)
boltUrl = args(3)
username = args(4)
password = args(5)
}
//获取SparkContext
val conf = new SparkConf()
.setAppName(appName)
.setMaster(masterUrl)
val sc = new SparkContext(conf)
//读取用户等级数据
val linesRDD = sc.textFile(filePath)
//校验数据准确性
val filterRDD = linesRDD.filter(line=>{
val fields = line.split("\t")
//判断每一行的列数是否正确,以及这一行是不是表头
if(fields.length==8 && !fields(0).equals("id")){
true
}else{
false
}
})
//处理数据
filterRDD.foreachPartition(it=>{
//获取neo4j连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
it.foreach(line=>{
//6 1000 380 4 2016-07-05 17:48:22 2026-02-01 11:43:18 0 18
val fields = line.split("\t")
//添加等级
session.run("merge(u:User {uid: '"+fields(1).trim+"'}) set u.level = "+fields(3).trim)
})
//关闭会话
session.close()
//关闭连接
driver.close()
})
}
}
在本地执行代码
3、验证neo4j
效果如下:
4、提交任务脚本
开发任务执行脚本
startUpdateUserLevel.sh
#!/bin/bash
#默认获取昨天时间
dt=`date -d "1 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi
#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/cl_level_user/${dt}"
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
# 组装一个唯一的名称
appName="UpdateUserLevelScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"
yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.UpdateUserLevelScala \
--jars ${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar \
/data/soft/video_recommend/jobs/update_user_level-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${boltUrl} ${username} ${password}
#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
5、添加打包配置
对项目代码编译打包,在pom.xml中添加打包配置
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
6、打包
打jar包
D:\IdeaProjects\db_video_recommend\update_user_level>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ update_user_level ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend\update_user_level\target\update_user_level-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.165s
[INFO] Final Memory: 23M/619M
[INFO] ------------------------------------------------------------------------
7、上传jar包和脚本
将jar包和任务执行脚本上传到bigdata04机器上面
[root@bigdata04 jobs]# ll
-rw-r--r--. 1 root root 1206 Aug 30 2020 startUpdateUserLevel.sh
-rw-r--r--. 1 root root 7400 Aug 30 2020 update_user_level-1.0-SNAPSHOT.jar
8、提交任务
向集群中提交任务
[root@bigdata04 jobs]# sh -x startUpdateUserLevel.sh 20260201
到集群中验证任务执行状态,发现任务执行成功,此时neo4j中的数据还是老样子,因为刚才我们已经在本地执行过一次了,重复再执行对结果没影响。