0
点赞
收藏
分享

微信扫一扫

FileBeat + Flume + Kafka + HDFS + Neo4j + Flink + Redis:【案例】三度关系推荐V2.0版本03:实时维护粉丝关注数据

四月天2021 2022-03-19 阅读 35
hadoopflink

一、数据计算步骤汇总

下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到Redis

代码下载:

链接:https://pan.baidu.com/s/1kzuwD3XarH26_roq255Yyg?pwd=559p 
提取码:559p 

二、实时维护粉丝关注数据

1、创建项目

使用Flink程序实时维护Neo4j中粉丝关注数据
先创建maven项目db_video_recommend_v2
在pom.xml中添加项目需要用到的所有依赖

再创建子module项目:real_time_follow
在项目中创建scala目录,引入scala2.12版本的SDK
创建package:com.imooc.flink
在pom.xml中添加依赖

<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.neo4j.driver</groupId>
        <artifactId>neo4j-java-driver</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>

在resources目录中添加log4j.properties配置文件

2、创建RealTimeFollowScala

创建类:RealTimeFollowScala
代码如下:

package com.imooc.flink

import java.util.Properties

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
 * 任务2:
 * 实时维护粉丝关注数据
 * 
 */
object RealTimeFollowScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //指定FlinkKafkaConsumer相关配置
    val topic = "user_follow"
    val prop = new Properties()
    prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092")
    prop.setProperty("group.id","con2")
    val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
    //kafka consumer的消费策略设置
    kafkaConsumer.setStartFromGroupOffsets()
    //指定kafka作为source
    import org.apache.flink.api.scala._
    val text = env.addSource(kafkaConsumer)

    //解析json数据中的核心字段
    val tupStream = text.map(line => {
      val jsonObj = JSON.parseObject(line)
      val desc = jsonObj.getString("desc")
      val followerUid = jsonObj.getString("followeruid")
      val followUid = jsonObj.getString("followuid")
      (desc, followerUid, followUid)
    })

    //使用Neo4jSink维护粉丝关注数据

    env.execute("RealTimeFollowScala")
  }

}

注意:由于flink中的实时计算是来一条数据计算一次,在StreamAPI中没有mapPartition方法,不支持一批一批的处理,如果每处理一条数据就获取一次Neo4j数据库连接,这样效率就太差了,所以我们需要实现一个自定义的sink组件,在sink组件内部有一个初始化函数可以获取一次连接,多次使用,这样就不需要频繁创建neo4j数据库连接了。

实现自定义的sink需要实现SinkFunction接口或者继承RichSinkFunction
具体实现逻辑可以参考已有connector中针对sink组件的实现
例如:RedisSink
源码在这里:

https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java

这里面一共有三个主要的函数:
1:open,是一个初始化方法,在Sink组件初始化的时候执行一次,适合在里面初始化一些资源连接
2:invoke,会被频繁调用,sink接收到一条数据这个方法就会执行一次,具体的业务逻辑在这里实现
3:close,当任务停止的时候,会先调用sink组件中的close方法,适合在里面做一些关闭资源的操作

参考RedisSink的源码我们来开发一个基于Neo4j的自定义Sink

3、创建Neo4jSink

创建类:Neo4jSink
代码如下:

package com.imooc.flink

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase, Transaction, TransactionWork}

/**
 * 维护粉丝数据在Neo4j中的关注关系
 * 关注和取消关注
 *
 */
class Neo4jSink extends RichSinkFunction[Tuple3[String,String,String]]{
  //保存neo4j相关的配置参数
  var param: Map[String,String] = Map()

  var driver: Driver = _

  /**
   * 构造函数
   * 接收neo4j相关的配置参数
   * @param param
   */
  def this(param: Map[String,String]){
    this()
    this.param = param
  }

  /**
   * 初始化方法,只执行一次
   * 适合初始化资源连接
   * @param parameters
   */
  override def open(parameters: Configuration): Unit = {
    //初始化Neo4j连接
    this.driver = GraphDatabase.driver(param("boltUrl"), AuthTokens.basic(param("userName"), param("passWord")))
  }

  /**
   * 核心代码,来一条数据,此方法就会执行一次
   * @param value
   * @param context
   */
  override def invoke(value: Tuple3[String,String,String], context: SinkFunction.Context[_]): Unit = {
    //开启会话
    val session = driver.session()
    val followType = value._1
    val followerUid = value._2
    val followUid = value._3

    if("follow".equals(followType)){
      //添加关注:因为涉及多条命令,所以需要使用事务
      session.writeTransaction(new TransactionWork[Unit] (){
        override def execute(tx: Transaction): Unit = {
          try{
            tx.run("merge (:User {uid:'"+followerUid+"'})")
            tx.run("merge (:User {uid:'"+followUid+"'})")
            tx.run("match(a:User {uid:'"+followerUid+"'}),(b:User {uid:'"+followUid+"'}) merge (a) -[:follow]-> (b)")
            //提交事务
            tx.commit()
          }catch {
            case ex: Exception => tx.rollback()
          }
        }
      })
    }else{
      //取消关注
      session.run("match (:User {uid:'"+followerUid+"'}) -[r:follow]-> (:User {uid:'"+followUid+"'}) delete r")
    }
    //关闭会话
    session.close()
  }

  /**
   * 任务停止的时候会先调用此方法
   * 适合关闭资源连接
   */
  override def close(): Unit = {
    //关闭连接
    if(driver!= null){
      driver.close()
    }
  }


}

4、完善RealTimeFollowScala

完善RealTimeFollowScala的代码

//使用Neo4jSink维护粉丝关注数据
val param = Map("boltUrl"->"bolt://bigdata04:7687","userName"->"neo4j","passWord"->"admin")
tupStream.addSink(new Neo4jSink(param))

此时RealTimeFollowScala中的完整代码如下:

package com.imooc.flink

import java.util.Properties

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
 * 任务2:
 * 实时维护粉丝关注数据
 * 
 */
object RealTimeFollowScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //指定FlinkKafkaConsumer相关配置
    val topic = "user_follow"
    val prop = new Properties()
    prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092")
    prop.setProperty("group.id","con2")
    val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
    //kafka consumer的消费策略设置
    kafkaConsumer.setStartFromGroupOffsets()
    //指定kafka作为source
    import org.apache.flink.api.scala._
    val text = env.addSource(kafkaConsumer)

    //解析json数据中的核心字段
    val tupStream = text.map(line => {
      val jsonObj = JSON.parseObject(line)
      val desc = jsonObj.getString("desc")
      val followerUid = jsonObj.getString("followeruid")
      val followUid = jsonObj.getString("followuid")
      (desc, followerUid, followUid)
    })

    //使用Neo4jSink维护粉丝关注数据
    val param = Map("boltUrl"->"bolt://bigdata04:7687","userName"->"neo4j","passWord"->"admin")
    tupStream.addSink(new Neo4jSink(param))

    env.execute("RealTimeFollowScala")
  }

}

执行RealTimeFollowScala代码

注意:需要确保zookeeper、kafka服务是正常运行的。

接下来需要产生测试数据,我们可以继续使用之前generate_data项目中的GenerateRealTimeFollowData产生数据,这种流程我们前面在v1.0中已经使用过了。

5、方便测试的方法

下面给大家演示一种方便测试的方法。
其实我们可以通过kafka的基于console的生产者直接向user_follow这个topic生产数据。

[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user_follow

先模拟生成一条粉丝关注的数据

{"followeruid":"2004","followuid":"2008","timestamp":1598771070069,"type":"user_follow","desc":"follow"}

到neo4j中确认效果发现确实新增了一个关注关系
再模拟产生一条粉丝取消关注的数据

{"followeruid":"2004","followuid":"2008","timestamp":1598771070069,"type":"user_follow","desc":"unfollow"}

到neo4j中确认效果发现刚才新增的关注关系没有了。
这样就说明我们自己定义的Neo4jSink是可以正常工作的。

注意:在实际工作中,有时候为了方便测试代码是否可以正常运行,很多时候也会采用这种基于控制台的生产者直接模拟产生数据,这样不会经过中间商,没有差价!
如果使用整个数据采集全链路流程的话,可能会由于中间某个环节出问题导致的最终看不到效果,此时我们还得排查到底是哪里出了问题,这样就乱套了,本来是要验证代码逻辑的,结果又要去排查其它地方的问题了。
所以说针对流程比较复杂的,我们在测试的时候一块一块进行测试,先验证代码逻辑没问题,最后再跑一个全流程确认一下最终效果。

接下来我们希望把代码提交到集群上运行

6、提取参数

需要先调整代码,把参数提取出来

package com.imooc.flink

import java.util.Properties

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
 * 任务2:
 * 实时维护粉丝关注数据
 * 
 */
object RealTimeFollowScala {
  def main(args: Array[String]): Unit = {
    var appName = "RealTimeFollowScala"
    var kafkaBrokers = "bigdata01:9092,bigdata02:9092,bigdata03:9092"
    var groupId = "con_1"
    var topic = "user_follow"
    var boltUrl = "bolt://bigdata04:7687"
    var userName = "neo4j"
    var passWord = "admin"
    if(args.length > 0){
      appName = args(0)
      kafkaBrokers = args(1)
      groupId = args(2)
      topic = args(3)
      boltUrl = args(4)
      userName = args(5)
      passWord = args(6)
    }
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //指定FlinkKafkaConsumer相关配置
    val prop = new Properties()
    prop.setProperty("bootstrap.servers",kafkaBrokers)
    prop.setProperty("group.id",groupId)
    val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
    //kafka consumer的消费策略设置
    kafkaConsumer.setStartFromGroupOffsets()
    //指定kafka作为source
    import org.apache.flink.api.scala._
    val text = env.addSource(kafkaConsumer)

    //解析json数据中的核心字段
    val tupStream = text.map(line => {
      val jsonObj = JSON.parseObject(line)
      val desc = jsonObj.getString("desc")
      val followerUid = jsonObj.getString("followeruid")
      val followUid = jsonObj.getString("followuid")
      (desc, followerUid, followUid)
    })

    //使用Neo4jSink维护粉丝关注数据
    val param = Map("boltUrl"->boltUrl,"userName"->userName,"passWord"->passWord)
    tupStream.addSink(new Neo4jSink(param))

    env.execute(appName)
  }

}

7、打包配置

对项目打jar包
在pom.xml中添加编译打包配置
这里面的scala版本指定的是2.12版本
注意:flink官方建议把所有依赖都打进一个jar包,所以我们在这就把依赖打进一个jar包里面。
在flink1.11的时候新增了一个特性,可以支持动态指定依赖的jar包,但是我测试了还是有bug,所以在这我们就只能把依赖都打进jar包里面,其实我内心是拒绝的。

注意:针对log4j,flink相关的依赖在打包的时候不需要打进去,所以需要添加provided属性。

<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.neo4j.driver</groupId>
        <artifactId>neo4j-java-driver</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>
<build>
    <plugins>
        <!-- 编译插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <!-- scala编译插件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.1.6</version>
            <configuration>
                <scalaCompatVersion>2.12</scalaCompatVersion>
                <scalaVersion>2.12.11</scalaVersion>
                <encoding>UTF-8</encoding>
            </configuration>
            <executions>
                <execution>
                    <id>compile-scala</id>
                    <phase>compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>test-compile-scala</id>
                    <phase>test-compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <!-- 打jar包插件(会包含所有依赖) -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.6</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <!-- 可以设置jar包的入口类(可选) -->
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

8、打包

打jar包

D:\IdeaProjects\db_video_recommend_v2\real_time_follow>mvn clean package -DskipTests
[INFO] Scanning for projects...
......
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ real_time_follow ---
[INFO] Building jar: [INFO] Building jar: D:\IdeaProjects\db_video_recommend_v2\real_time_follow\target\real_time_follow-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-assembly-plugin:2.6:single (make-assembly) @ real_time_follow ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend_v2\real_time_follow\target\real_time_follow-1.0-SNAPSHOT-jar-with-dependencies.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.077s
[INFO] Final Memory: 15M/491M
[INFO] ------------------------------------------------------------------------

此时我们就需要使用这个带有jar-with-dependencies的jar包了
real_time_follow-1.0-SNAPSHOT-jar-with-dependencies.jar

9、开发脚本

开发任务提交脚本
startRealTimeFollow.sh

#!/bin/bash
masterUrl="yarn-cluster"
appName="RealTimeFollowScala"
kafkaBrokers="bigdata01:9092,bigdata02:9092,bigdata03:9092"
groupId="con_2"
topic="user_follow"
boltUrl="bolt://bigdata04:7687"
userName="neo4j"
passWord="admin"

#注意:需要将flink脚本路径配置到Linux的环境变量中
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.imooc.flink.RealTimeFollowScala \
/data/soft/video_recommend_v2/jobs/real_time_follow-1.0-SNAPSHOT-jar-with-dependencies.jar ${appName} ${kafkaBrokers} ${groupId} ${topic} ${boltUrl} ${userName} ${passWord}

在bigdata04上创建目录:video_recommend_v2和jobs目录

[root@bigdata04 soft]# mkdir -p video_recommend_v2/jobs

10、上传脚本

将打好的jar包和脚本上传到jobs目录中

[root@bigdata04 jobs]# ll
-rw-r--r--. 1 root root 80993088 Sep  6  2020 real_time_follow-1.0-SNAPSHOT-jar-with-dependencies.jar
-rw-r--r--. 1 root root      616 Feb 21 05:20 startRealTimeFollow.sh

11、检查环境变量

注意:在执行之前需要配置flink的环境变量,FLINK_HOME

[root@bigdata04 jobs]# vi /etc/profile
......
export JAVA_HOME=/data/soft/jdk1.8
export HADOOP_HOME=/data/soft/hadoop-3.2.0
export HIVE_HOME=/data/soft/apache-hive-3.1.2-bin
export SPARK_HOME=/data/soft/spark-2.4.3-bin-hadoop2.7
export SQOOP_HOME=/data/soft/sqoop-1.4.7.bin__hadoop-2.6.0
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
export FLINK_HOME=/data/soft/flink-1.11.1
export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HIVE_HOME/bin:$SPARK_HO
ME/bin:$SQOOP_HOME/bin:$FLINK_HOME/bin:$PATH
[root@bigdata04 jobs]# source /etc/profile

12、开始提交任务

向集群提交任务

[root@bigdata04 jobs]# sh -x startRealTimeFollow.sh

通过kafka的console控制台生产者,模拟产生数据,到neo4j中确认效果,发现是没有问题的。

注意:此时是存在数据乱序的问题的,前面在讲Flink的时候我们详细讲解过Flink中的乱序处理方案,在这里给大家留一个作业,对这个代码进行改造,解决数据乱序问题。

举报

相关推荐

0 条评论