0
点赞
收藏
分享

微信扫一扫

FileBeat + Flume + Kafka + HDFS + Neo4j + Flink + Redis:【案例】三度关系推荐V2.0版本04:每天定时更新主播等级

Just_Esme 2022-03-19 阅读 37
hadoopflink

一、数据计算步骤汇总

下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到Redis

代码下载:

链接:https://pan.baidu.com/s/1kzuwD3XarH26_roq255Yyg?pwd=559p 
提取码:559p

二、每天定时更新主播等级

使用Flink程序实现每天定时更新主播等级

1、创建项目

创建子module项目:update_user_level
在项目中创建scala目录,引入scala2.12版本的SDK
创建package:com.imooc.flink

2、添加依赖

在pom.xml中添加依赖

<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.neo4j.driver</groupId>
        <artifactId>neo4j-java-driver</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
    </dependency>
</dependencies>

在resources目录中添加log4j.properties配置文件

4、创建UpdateUserLevelScala

创建类:UpdateUserLevelScala
代码如下:

package com.imooc.flink

import org.apache.flink.api.scala.ExecutionEnvironment
import org.neo4j.driver.{AuthTokens, GraphDatabase}


/**
 * 任务3:
 * 每天定时更新主播等级
 * 
 */
object UpdateUserLevelScala {

  def main(args: Array[String]): Unit = {
    var filePath = "hdfs://bigdata01:9000/data/cl_level_user/20260201"
    var boltUrl = "bolt://bigdata04:7687"
    var userName = "neo4j"
    var passWord = "admin"
    if(args.length > 0){
      filePath = args(0)
      boltUrl = args(1)
      userName = args(2)
      passWord = args(3)
    }

    val env = ExecutionEnvironment.getExecutionEnvironment
    //读取hdfs中的数据
    val text = env.readTextFile(filePath)

    //校验数据准确性
    val filterSet = text.filter(line => {
      val fields = line.split("\t")
      //判断每一行的列数是否正确,以及这一行是不是表头
      if (fields.length == 8 && !fields(0).equals("id")) {
        true
      } else {
        false
      }
    })

    //添加隐式转换代码
    import org.apache.flink.api.scala._

    //处理数据
    filterSet.mapPartition(it => {
      //获取neo4j的连接
      val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(userName, passWord))
      //开启一个会话
      val session = driver.session()
      it.foreach(line => {
        val fields = line.split("\t")
        //添加等级
        session.run("merge(u:User {uid:'" + fields(1).trim + "'}) set u.level = " + fields(3).trim)
      })
      //关闭会话
      session.close()
      //关闭连接
      driver.close()
      ""
    }).print()
  }

}

使用之前生成的这份数据
hdfs://bigdata01:9000/data/cl_level_user/20260201
在本地执行代码,到neo4j中确认节点中是否新增了level属性,如果有,就说明程序执行成功了。

在这里插入图片描述

5、打包配置

接下来对程序编译打包
在pom.xml中添加编译打包配置

<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.neo4j.driver</groupId>
        <artifactId>neo4j-java-driver</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>
<build>
    <plugins>
        <!-- 编译插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <!-- scala编译插件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.1.6</version>
            <configuration>
                <scalaCompatVersion>2.12</scalaCompatVersion>
                <scalaVersion>2.12.11</scalaVersion>
                <encoding>UTF-8</encoding>
            </configuration>
            <executions>
                <execution>
                    <id>compile-scala</id>
                    <phase>compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>test-compile-scala</id>
                    <phase>test-compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <!-- 打jar包插件(会包含所有依赖) -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.6</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <!-- 可以设置jar包的入口类(可选) -->
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

6、打包

打jar包

D:\IdeaProjects\db_video_recommend_v2\update_user_level>mvn clean package -DskipTests
[INFO] Scanning for projects...
.......
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ update_user_level ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend_v2\update_user_level\target\update_user_level-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-assembly-plugin:2.6:single (make-assembly) @ update_user_level ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend_v2\update_user_level\target\update_user_level-1.0-SNAPSHOT-jar-with-dependencies.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.082s
[INFO] Final Memory: 43M/889M
[INFO] ------------------------------------------------------------------------

在这里插入图片描述

7、开发脚本

开发任务提交脚本
startUpdateUserLevel.sh

#!/bin/bash
#默认获取昨天时间
dt=`date -d "1 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi

#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/cl_level_user/${dt}"

masterUrl="yarn-cluster"
appName="UpdateUserLevelScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
userName="neo4j"
passWord="admin"

#注意:需要将flink脚本路径配置到Linux的环境变量中
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 2 \
-c com.imooc.flink.UpdateUserLevelScala \
/data/soft/video_recommend_v2/jobs/update_user_level-1.0-SNAPSHOT-jar-with-dependencies.jar ${filePath} ${boltUrl} ${userName} ${passWord}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $8}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
    echo "任务执行失败"
    # 发送短信或者邮件
else
    echo "任务执行成功"
fi

8、上传jar包

将打好的jar包和脚本上传到jobs目录中

[root@bigdata04 jobs]# ll
-rw-r--r--. 1 root root      667 Sep  6  2020 startUpdateUserLevel.sh
-rw-r--r--. 1 root root  4564216 Sep  6  2020 update_user_level-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

9、提交任务

向集群提交任务

[root@bigdata04 jobs]# sh -x startUpdateUserLevel.sh 20260201

任务执行成功,没有问题。

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

举报

相关推荐

0 条评论