0
点赞
收藏
分享

微信扫一扫

FileBeat + Flume + Kafka + HDFS + Neo4j + SparkStreaming + MySQL:【案例】三度关系推荐V1.0版本12:最近一周内主播主播的三度关系列表

得一道人 2022-03-19 阅读 87
hadoopspark

一、数据计算步骤汇总

下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到MySQL

二、每周一计算最近一周内主播主播的三度关系列表

1、分析

前面我们在neo4j中维护了粉丝和主播的一些信息,在这里我们就需要基于neo4j中的数据统计主播的三度关系推荐列表了

这个任务在这也是每周计算一次,我们测试过,每天都计算的话,最终的结果变化是不大的,所以就没必要每天计算了。

2、创建项目

创建子module项目:get_recommend_list
创建scala目录,引入scala2.11版本的sdk
在scala目录中创建包:com.imooc.spark

(1)引入依赖

引入依赖,这里面需要额外用到spark-sql和neo4j-spark-connector这两个依赖

<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
	<groupId>org.neo4j.driver</groupId>
	<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
	<groupId>neo4j-contrib</groupId>
	<artifactId>neo4j-spark-connector</artifactId>
</dependency>

注意:在使用spark读取neo4j中数据的时候,可以使用一个插件,在官网中可以找到

在这里插入图片描述
在这里插入图片描述
咱们前面使用的neo4j-java-driver相当于是使用原生代码操作neo4j,而现在使用neo4j-spark-connector相当于把neo4j封装到spark中了,使用起来比较方便。

在使用neo4j-spark-connector的时候,选择哪个版本呢?
点这里进去看一下

在这里插入图片描述
这个版本是基于neo4j 4.0,我们现在使用的neo4j是3.5的,这种一般是向下兼容的,所以操作neo4j 3.5也是可以的,后面写的spark是2.4.5,这个也是可以的,我们使用的spark是2.4.3的,最后一位版本号不一致没问题。
目前最新版本是基于scala2.12版本编译的,我们在spark项目中使用的scala版本是2.11,所以使用2.4.5-M1这个版本。

在这里插入图片描述
注意:在使用这个依赖的时候,还需要配置它对应的repository,因为这个依赖没有在maven仓库中,把这些配置添加到父项目的pom.xml文件中

在这里插入图片描述
在这里插入图片描述
注意!!这里引入依赖会出现一点问题,改为如下:

在这里插入图片描述
因为:
目前neo4j-spark-connector对应的源仓库有问题,禁止访问,所以导致这个依赖无法正常下载。
其实在工作中也会遇到类似这样的问题。

所以这里依赖为:

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.neo4j.driver</groupId>
            <artifactId>neo4j-java-driver</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>neo4j-contrib</groupId>
            <artifactId>neo4j-spark-connector</artifactId>
            <version>2.4.5-M1</version>
            <scope>system</scope>
            <systemPath>D:/neo4j-spark-connector-2.4.5-M1.jar</systemPath>
        </dependency>
    </dependencies>

neo4j-spark-connector-2.4.5-M1.jar下载地址:

链接:https://pan.baidu.com/s/1fcv-bamdXOoUtqRogU7-3w?pwd=emzw 
提取码:emzw 

(2)开发代码

接下来我们来开发代码
创建类:GetRecommendListScala
代码如下:

package com.imooc.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.neo4j.spark.Neo4j

import scala.collection.mutable.ArrayBuffer

/**
 * 任务6:
 * 每周一计算最近一周内主活主播的三度关系列表
 * 注意:
 * 1:待推荐主播最近一周内活跃过
 * 2:待推荐主播等级>4
 * 3:待推荐主播最近1个月视频评级满足3B+或2A+ (flag=1)
 * 4:待推荐主播的粉丝列表关注重合度>2
 * Created by xuwei
 */
object GetRecommendListScala {
  def main(args: Array[String]): Unit = {
    var masterUrl = "local[2]"
    var appName = "GetRecommendListScala"
    var boltUrl = "bolt://bigdata04:7687"
    var username = "neo4j"
    var password = "admin"
    var timestamp = 0L //过滤最近一周内是否活跃过
    var duplicateNum = 2 //粉丝列表关注重合度
    var level = 4 //主播等级
    var outputPath = "hdfs://bigdata01:9000/data/recommend_data/20260201"
    if(args.length > 0){
      masterUrl = args(0)
      appName = args(1)
      boltUrl = args(2)
      username = args(3)
      password = args(4)
      timestamp = args(5).toLong
      duplicateNum = args(6).toInt
      level = args(7).toInt
      outputPath = args(8)
    }

    //获取SparkContext
    val conf = new SparkConf()
      .setAppName(appName)
      .setMaster(masterUrl)
      .set("spark.driver.allowMultipleContexts","true")
      .set("spark.neo4j.url", boltUrl)
      .set("spark.neo4j.user",username)
      .set("spark.neo4j.password",password)
    val sc = new SparkContext(conf)

    //获取一周内主活的主播 并且主播等级大于4的数据
    var params = Map[String,Long]()
    params += ("timestamp"->timestamp)
    params += ("level"->level)
    val neo4j: Neo4j = Neo4j(sc).cypher("match (a:User )  where a.timestamp >= {timestamp} and a.level >= {level}  return a.uid").params(params)


    //将从neo4j中查询出来的数据转换为rowRDD
    val rowRDD = neo4j.loadRowRdd

    //一次处理一批
    //过滤出粉丝关注重合度>2的数据,并且对关注重合度倒序排列
    //最终的数据格式是:主播id,待推荐的主播id
    val mapRDD = rowRDD.mapPartitions(it=>{
      //获取neo4j连接
      val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
      //开启一个会话
      val session = driver.session()
      //保存计算出来的结果
      val resultArr = ArrayBuffer[String]()
      it.foreach(row=>{
        val uid = row.getString(0)
        // 计算某一个用户的三度关系(主播的二度关系)
        // 注意:数据量大了之后,这个计算操作是非常耗时的。
        //val result = session.run("match (a:User {uid:\""+uid+"\"}) <-[:follow]- (b:User) -[:follow]-> (c:User)  return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")
        //对b、c的主活时间进行过滤,以及对c的level和flag值进行过滤
        val result = session.run("match (a:User {uid:\""+uid+"\"}) <-[:follow]- (b:User) -[:follow]-> (c:User) where  b.timestamp >= "+timestamp+" and c.timestamp >= "+timestamp+" and c.level >="+level+" and c.flag = 1 return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")
        while(result.hasNext){
          val record = result.next()
          val sum = record.get("sum").asInt()
          if(sum>duplicateNum){
            resultArr+=record.get("auid").asString()+"\t"+record.get("cuid").asString()
          }
        }
      })
      //关闭会话
      session.close()
      //关闭连接
      driver.close()

      resultArr.iterator
    })

    //把数据转成tuple2的形式
    val tup2RDD = mapRDD.map(line => {
      val splits = line.split("\t")
      (splits(0),splits(1))
    })
    //根据主播id进行分组,可以获取到这个主播的待推荐列表
    val reduceRDD = tup2RDD.reduceByKey((v1,v2)=>{
      v1+","+v2
    })

    //最终把结果组装成这种形式
    //10001 1002,1003,1004
    reduceRDD.map(tup=>{
      tup._1+"\t"+tup._2
    }).repartition(1).saveAsTextFile(outputPath)


  }
}

(3)先不进行条件过滤

先使用这一行代码,在计算三度关系数据的时候暂时先不进行条件过滤。

val result = session.run("match (a:User {uid:\""+uid+"\"}) <-[:follow]- (b:User) -[:follow]-> (c:User)  return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")

执行代码,验证结果如下:

[root@bigdata04 neo4j-community-3.5.21]# hdfs dfs -cat /data/recommend_data/20260201/part-00000
1005    1000
1000    1005,1004
1004    1000

到neo4j中验证一下,确实是正确的。

(4)进行条件过滤

接着使用这一行代码,在计算三度关系数据的对数据进行条件过滤

val result = session.run("match (a:User {uid:\""+uid+"\"}) <-[:follow]- (b:User) -[:follow]-> (c:User) where  b.timestamp >= "+timestamp+" and c.timestamp >= "+timestamp+" and c.level >="+level+" and c.flag = 1 return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")

执行代码,验证结果如下:

[root@bigdata04 neo4j-community-3.5.21]# hdfs dfs -cat /data/recommend_data/20260201/part-00000
1005    1000
1000    1004
1004    1000

到neo4j中验证一下,确实是正确的,因为1005的flag不为1,被过滤掉了,所以我在关注1000这个主播的时候平台只需要给我推荐主1004这个主播即可。

(5)重新设置rowRDD的分区

注意:这个代码在执行mapPartitions的时候,最好把rowRDD的分区重新设置一下,如果让程序自动分配的话可能不太合理,分多了分少了都不太好

由于我们在mapPartitions中需要操作neo4j,所以这个时候rowRDD分区的数量就可以等于(neo4j服务器的CPU数量-1),要给neo4j预留出来一个cpu来处理其它任务请求。

我们当时的服务器是8个CPU,给neo4j预留出来一个,剩下还有7个,所以说,neo4j此时可以对外提供的最大并发处理能力是7,那我们就把rowRDD设置为7个分区,就会有7个线程并行处理数据,它们可以并行操作neo4j,这样效率最高。

如果给rowRDD设置的分区太多,对应的就会有多个线程并行操作neo4j,会给neo4j造成很大的压力,相当于neo4j在满负荷的运行,这个时候我们另外一个实时维护neo4j中粉丝关注数据的程序执行起来就很慢了,以及其他人如果这个时候想查询neo4j,也会非常慢,所以这样就不太好了。

如果给rowRDD设置的分区太少,对应产生的执行线程就比较少,此时neo4j会比较空闲,没有多大压力,但是我们这个三度关系的任务执行就非常慢了。

综上所述,建议把rowRDD的分区数量设置为7,这样可以充分利用neo4j服务器的性能,也不至于把neo4j服务器拖垮。

//将从neo4j中查询出来的数据转换为rowRDD
//val rowRDD = neo4j.loadRowRdd
//repartition 这里的repartition是为了把数据分为7份,这样下面的mapPartitions在执行的时候就有7个线程,这7个线程并行查询neo4j数据库
val rowRDD = neo4j.loadRowRdd.repartition(7)

(6)优化

还有一点可以优化的,增加RDD持久化,把RDD数据缓存起来,这样可以避免个别task失败导致的数据重算,因为这个计算还是比较消耗时间的,所以说尽可能保证计算出来的数据不丢失。

val mapRDD = rowRDD.mapPartitions(it=>{


}).persist(StorageLevel.MEMORY_AND_DISK)//把RDD数据缓存起来

(7)思考

问题:大家有没有想过,我们是否可以直接在Neo4j(sc).cypher(…)中指定一条查询语句,直接把所有的三度关系全部查询出来?

这样理论上是可以的,但是在实际中,当neo4j中存储的节点数和关系数量达到千万级别之后,同时查询所有满足条件主播的三度关系推荐列表的时候会很慢,有时候会导致等了几十分钟也查询不出来数据,所以在这我们就把这个功能进行了拆解,先查看满足条件的主播uid列表,然后再一个一个计算这些主播的三度关系推荐列表,这样可以提高计算效率,并且不会出现查询不出来结果的情况。

下面我们将任务提交到集群上面去执行。

3、打包配置

先添加编译打包配置

<build>
    <plugins>
        <!-- java编译插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <!-- scala编译插件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.1.6</version>
            <configuration>
                <scalaCompatVersion>2.11</scalaCompatVersion>
                <scalaVersion>2.11.12</scalaVersion>
            </configuration>
            <executions>
                <execution>
                    <id>compile-scala</id>
                    <phase>compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>test-compile-scala</id>
                    <phase>test-compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

4、进行打包

打jar包

D:\IdeaProjects\db_video_recommend\get_recommend_list>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ get_recommend_list ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend\get_recommend_list\target\get_recommend_list-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.455s
[INFO] Final Memory: 22M/475M
[INFO] ------------------------------------------------------------------------

5、开发脚本

开发任务提交脚本
startGetRecommendList.sh

#!/bin/bash
#默认获取上周一的时间
dt=`date -d "7 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
    dt=`date -d "7 days ago $1" +"%Y%m%d"`
fi

masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`

# 组装一个唯一的名称
appName="GetRecommendListScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"
#获取上周一的时间戳(单位:毫秒)
timestamp=`date --date="${dt}" +%s`000
#粉丝列表关注重合度
duplicateNum=2
#主播等级
level=4

#输出结果数据路径
outputPath="hdfs://bigdata01:9000/data/recommend_data/${dt}"

yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"

spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.GetRecommendListScala \
--jars ${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar,${yarnCommonLib}/neo4j-spark-connector-2.4.5-M1.jar \
/data/soft/video_recommend/jobs/get_recommend_list-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${boltUrl} ${username} ${password} ${timestamp} ${duplicateNum} ${level} ${outputPath}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
    echo "任务执行失败"
    # 发送短信或者邮件
else
    echo "任务执行成功"
fi

6、上传jar包和脚本

将项目jar包和任务执行脚本上传到bigdata04机器上

[root@bigdata04 jobs]# ll
-rw-r--r--. 1 root root 11857 Aug 31  2020 get_recommend_list-1.0-SNAPSHOT.jar
-rw-r--r--. 1 root root  1508 Aug 31  2020 startGetRecommendList.sh

7、提交任务、验证

向集群中提交任务

[root@bigdata04 jobs]# sh -x startGetRecommendList.sh 20260201

到hdfs中验证结果,看到这个结果,说明任务在集群中正常执行了。

[root@bigdata04 neo4j-community-3.5.21]# hdfs dfs -cat /data/recommend_data/20260125/*
1005    1000
1000    1004
1004    1000
举报

相关推荐

0 条评论