0
点赞
收藏
分享

微信扫一扫

FileBeat + Flume + Kafka + HDFS + Neo4j + SparkStreaming + MySQL:【案例】三度关系推荐V1.0版本10:每天定时更新用户活跃时间

千妈小语 2022-03-19 阅读 34
hadoopspark

一、数据计算步骤汇总

下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到MySQL

二、每天定时更新用户活跃时间

1、数据分析

数据来源于客户端上报,每天只要打开过APP就会上报数据
数据格式:

{"uid":"1000","mcc":"452","countryCode":"VN","ver":"3.6.41","UnixtimeStamp":"1769912170000","ip":"171.247.0.154","type":"user_active"}

2、生成数据

之前我们通过埋点模拟上报数据,通过flume落盘到hdfs上面,这样在hdfs上面产生的目录会使用当天日期,为了保证我这里使用的目录和大家都保持一致,所以在这我就生成一个固定的日期目录
使用代码GenerateUserActiveDataV2,在代码中指定日期 2026-02-01,这样会把模拟生成的用户活跃数据直接上传到hdfs上面,因为之前的数据采集流程我们已经详细分析过了,所以在这就直接把数据上传到hdfs上面了。

执行代码:GenerateUserActiveDataV2,将会把数据上传到hdfs的这个目录下
hdfs://bigdata01:9000/data/user_active/20260201/

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /data/user_active/20260201
Found 1 items
-rw-r--r--   3 yehua supergroup       2699 2026-02-14 21:32 /data/user_active/20260201/user_active-2026-02-01.log

这个任务需要做的是把每天主动活跃的用户更新到neo4j中,在neo4j中维护一份用户的最新活跃时间

3、创建项目

创建子module项目:update_user_active
创建scala目录,引入scala2.11版本的sdk
在scala目录中创建包:com.imooc.spark

(1)引入依赖

<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
	<groupId>org.neo4j.driver</groupId>
	<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
</dependency>

(2)创建代码

创建类:UpdateUserActiveScala
代码如下:

package com.imooc.spark

import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}

/**
 * 任务4:
 * 每天定时更新用户活跃时间
 * 	 */
object UpdateUserActiveScala{
  def main(args: Array[String]): Unit = {
    var masterUrl = "local"
    var appName = "UpdateUserActive"
    var filePath = "hdfs://bigdata01:9000/data/user_active/20260201"
    var boltUrl = "bolt://bigdata04:7687"
    var username = "neo4j"
    var password = "admin"
    if(args.length > 0){
      masterUrl = args(0)
      appName = args(1)
      filePath = args(2)
      boltUrl = args(3)
      username = args(4)
      password = args(5)
    }

    //获取SparkContext
    val conf = new SparkConf()
      .setAppName(appName)
      .setMaster(masterUrl)
    val sc = new SparkContext(conf)

    //读取用户活跃数据
    val linesRDD = sc.textFile(filePath)

    //处理数据
    linesRDD.foreachPartition(it=>{
      //获取neo4j连接
      val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
      //开启一个会话
      val session = driver.session()
      it.foreach(line=>{
        //{"uid":"1000","mcc":"452","countryCode":"VN","ver":"3.6.41","UnixtimeStamp":"1769912170000","ip":"171.247.0.154","type":"user_active"}
        val jsonObj = JSON.parseObject(line)
        val uid = jsonObj.getString("uid")
        val timeStamp = jsonObj.getString("UnixtimeStamp")
        //添加用户活跃时间
        session.run("merge(u:User {uid: '"+uid+"'}) set u.timestamp = "+timeStamp)
      })
      //关闭会话
      session.close()
      //关闭连接
      driver.close()
    })

  }
}

4、指行代码进行验证

在本地执行代码,查看neo4j中的数据,结果如下:

在这里插入图片描述

5、开发提交任务脚本

下面开发任务执行脚本
startUpdateUserActive.sh

#!/bin/bash
#默认获取昨天时间
dt=`date -d "1 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
    dt=$1
fi
#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/user_active/${dt}"


masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`

# 组装一个唯一的名称
appName="UpdateUserActiveScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"

yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"

spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.UpdateUserActiveScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar \
/data/soft/video_recommend/jobs/update_user_active-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${boltUrl} ${username} ${password}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
    echo "任务执行失败"
    # 发送短信或者邮件
else
    echo "任务执行成功"
fi

6、打包配置

对项目代码编译打包,在pom.xml中添加打包配置

<build>
<plugins>
    <!-- java编译插件 -->
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.0</version>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
            <encoding>UTF-8</encoding>
        </configuration>
    </plugin>
    <!-- scala编译插件 -->
    <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.1.6</version>
        <configuration>
            <scalaCompatVersion>2.11</scalaCompatVersion>
            <scalaVersion>2.11.12</scalaVersion>
        </configuration>
        <executions>
            <execution>
                <id>compile-scala</id>
                <phase>compile</phase>
                <goals>
                    <goal>add-source</goal>
                    <goal>compile</goal>
                </goals>
            </execution>
            <execution>
                <id>test-compile-scala</id>
                <phase>test-compile</phase>
                <goals>
                    <goal>add-source</goal>
                    <goal>testCompile</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
</plugins>
</build>

7、进行打包

打jar包

D:\IdeaProjects\db_video_recommend\update_user_level>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ update_user_level ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend\update_user_active\target\update_user_active-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.165s
[INFO] Final Memory: 23M/619M
[INFO] ------------------------------------------------------------------------

将jar包和任务执行脚本上传到bigdata04机器上面

[root@bigdata04 jobs]# ll
-rw-r--r--. 1 root root  1279 Aug 30  2020 startUpdateUserActive.sh
-rw-r--r--. 1 root root  6559 Aug 30  2020 update_user_active-1.0-SNAPSHOT.jar

8、提交任务

向集群中提交任务

[root@bigdata04 jobs]# sh -x startUpdateUserActive.sh 20260201

9、验证

到集群中验证任务执行状态,发现任务执行成功,此时neo4j中的数据还是老样子,因为刚才我们已经在本地执行过一次了,重复再执行对结果没影响。

在这里插入图片描述

举报

相关推荐

0 条评论