一、数据计算步骤汇总
下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到Redis
代码下载:
链接:https://pan.baidu.com/s/1kzuwD3XarH26_roq255Yyg?pwd=559p
提取码:559p
二、实时维护粉丝关注数据
1、创建项目
使用Flink程序实时维护Neo4j中粉丝关注数据
先创建maven项目db_video_recommend_v2
在pom.xml中添加项目需要用到的所有依赖
再创建子module项目:real_time_follow
在项目中创建scala目录,引入scala2.12版本的SDK
创建package:com.imooc.flink
在pom.xml中添加依赖
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
在resources目录中添加log4j.properties配置文件
2、创建RealTimeFollowScala
创建类:RealTimeFollowScala
代码如下:
package com.imooc.flink
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
/**
* 任务2:
* 实时维护粉丝关注数据
*
*/
object RealTimeFollowScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//指定FlinkKafkaConsumer相关配置
val topic = "user_follow"
val prop = new Properties()
prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092")
prop.setProperty("group.id","con2")
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
//kafka consumer的消费策略设置
kafkaConsumer.setStartFromGroupOffsets()
//指定kafka作为source
import org.apache.flink.api.scala._
val text = env.addSource(kafkaConsumer)
//解析json数据中的核心字段
val tupStream = text.map(line => {
val jsonObj = JSON.parseObject(line)
val desc = jsonObj.getString("desc")
val followerUid = jsonObj.getString("followeruid")
val followUid = jsonObj.getString("followuid")
(desc, followerUid, followUid)
})
//使用Neo4jSink维护粉丝关注数据
env.execute("RealTimeFollowScala")
}
}
注意:由于flink中的实时计算是来一条数据计算一次,在StreamAPI中没有mapPartition方法,不支持一批一批的处理,如果每处理一条数据就获取一次Neo4j数据库连接,这样效率就太差了,所以我们需要实现一个自定义的sink组件,在sink组件内部有一个初始化函数可以获取一次连接,多次使用,这样就不需要频繁创建neo4j数据库连接了。
实现自定义的sink需要实现SinkFunction接口或者继承RichSinkFunction
具体实现逻辑可以参考已有connector中针对sink组件的实现
例如:RedisSink
源码在这里:
https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
这里面一共有三个主要的函数:
1:open,是一个初始化方法,在Sink组件初始化的时候执行一次,适合在里面初始化一些资源连接
2:invoke,会被频繁调用,sink接收到一条数据这个方法就会执行一次,具体的业务逻辑在这里实现
3:close,当任务停止的时候,会先调用sink组件中的close方法,适合在里面做一些关闭资源的操作
参考RedisSink的源码我们来开发一个基于Neo4j的自定义Sink
3、创建Neo4jSink
创建类:Neo4jSink
代码如下:
package com.imooc.flink
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase, Transaction, TransactionWork}
/**
* 维护粉丝数据在Neo4j中的关注关系
* 关注和取消关注
*
*/
class Neo4jSink extends RichSinkFunction[Tuple3[String,String,String]]{
//保存neo4j相关的配置参数
var param: Map[String,String] = Map()
var driver: Driver = _
/**
* 构造函数
* 接收neo4j相关的配置参数
* @param param
*/
def this(param: Map[String,String]){
this()
this.param = param
}
/**
* 初始化方法,只执行一次
* 适合初始化资源连接
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//初始化Neo4j连接
this.driver = GraphDatabase.driver(param("boltUrl"), AuthTokens.basic(param("userName"), param("passWord")))
}
/**
* 核心代码,来一条数据,此方法就会执行一次
* @param value
* @param context
*/
override def invoke(value: Tuple3[String,String,String], context: SinkFunction.Context[_]): Unit = {
//开启会话
val session = driver.session()
val followType = value._1
val followerUid = value._2
val followUid = value._3
if("follow".equals(followType)){
//添加关注:因为涉及多条命令,所以需要使用事务
session.writeTransaction(new TransactionWork[Unit] (){
override def execute(tx: Transaction): Unit = {
try{
tx.run("merge (:User {uid:'"+followerUid+"'})")
tx.run("merge (:User {uid:'"+followUid+"'})")
tx.run("match(a:User {uid:'"+followerUid+"'}),(b:User {uid:'"+followUid+"'}) merge (a) -[:follow]-> (b)")
//提交事务
tx.commit()
}catch {
case ex: Exception => tx.rollback()
}
}
})
}else{
//取消关注
session.run("match (:User {uid:'"+followerUid+"'}) -[r:follow]-> (:User {uid:'"+followUid+"'}) delete r")
}
//关闭会话
session.close()
}
/**
* 任务停止的时候会先调用此方法
* 适合关闭资源连接
*/
override def close(): Unit = {
//关闭连接
if(driver!= null){
driver.close()
}
}
}
4、完善RealTimeFollowScala
完善RealTimeFollowScala的代码
//使用Neo4jSink维护粉丝关注数据
val param = Map("boltUrl"->"bolt://bigdata04:7687","userName"->"neo4j","passWord"->"admin")
tupStream.addSink(new Neo4jSink(param))
此时RealTimeFollowScala中的完整代码如下:
package com.imooc.flink
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
/**
* 任务2:
* 实时维护粉丝关注数据
*
*/
object RealTimeFollowScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//指定FlinkKafkaConsumer相关配置
val topic = "user_follow"
val prop = new Properties()
prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092")
prop.setProperty("group.id","con2")
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
//kafka consumer的消费策略设置
kafkaConsumer.setStartFromGroupOffsets()
//指定kafka作为source
import org.apache.flink.api.scala._
val text = env.addSource(kafkaConsumer)
//解析json数据中的核心字段
val tupStream = text.map(line => {
val jsonObj = JSON.parseObject(line)
val desc = jsonObj.getString("desc")
val followerUid = jsonObj.getString("followeruid")
val followUid = jsonObj.getString("followuid")
(desc, followerUid, followUid)
})
//使用Neo4jSink维护粉丝关注数据
val param = Map("boltUrl"->"bolt://bigdata04:7687","userName"->"neo4j","passWord"->"admin")
tupStream.addSink(new Neo4jSink(param))
env.execute("RealTimeFollowScala")
}
}
执行RealTimeFollowScala代码
注意:需要确保zookeeper、kafka服务是正常运行的。
接下来需要产生测试数据,我们可以继续使用之前generate_data项目中的GenerateRealTimeFollowData产生数据,这种流程我们前面在v1.0中已经使用过了。
5、方便测试的方法
下面给大家演示一种方便测试的方法。
其实我们可以通过kafka的基于console的生产者直接向user_follow这个topic生产数据。
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user_follow
先模拟生成一条粉丝关注的数据
{"followeruid":"2004","followuid":"2008","timestamp":1598771070069,"type":"user_follow","desc":"follow"}
到neo4j中确认效果发现确实新增了一个关注关系
再模拟产生一条粉丝取消关注的数据
{"followeruid":"2004","followuid":"2008","timestamp":1598771070069,"type":"user_follow","desc":"unfollow"}
到neo4j中确认效果发现刚才新增的关注关系没有了。
这样就说明我们自己定义的Neo4jSink是可以正常工作的。
注意:在实际工作中,有时候为了方便测试代码是否可以正常运行,很多时候也会采用这种基于控制台的生产者直接模拟产生数据,这样不会经过中间商,没有差价!
如果使用整个数据采集全链路流程的话,可能会由于中间某个环节出问题导致的最终看不到效果,此时我们还得排查到底是哪里出了问题,这样就乱套了,本来是要验证代码逻辑的,结果又要去排查其它地方的问题了。
所以说针对流程比较复杂的,我们在测试的时候一块一块进行测试,先验证代码逻辑没问题,最后再跑一个全流程确认一下最终效果。
接下来我们希望把代码提交到集群上运行
6、提取参数
需要先调整代码,把参数提取出来
package com.imooc.flink
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
/**
* 任务2:
* 实时维护粉丝关注数据
*
*/
object RealTimeFollowScala {
def main(args: Array[String]): Unit = {
var appName = "RealTimeFollowScala"
var kafkaBrokers = "bigdata01:9092,bigdata02:9092,bigdata03:9092"
var groupId = "con_1"
var topic = "user_follow"
var boltUrl = "bolt://bigdata04:7687"
var userName = "neo4j"
var passWord = "admin"
if(args.length > 0){
appName = args(0)
kafkaBrokers = args(1)
groupId = args(2)
topic = args(3)
boltUrl = args(4)
userName = args(5)
passWord = args(6)
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
//指定FlinkKafkaConsumer相关配置
val prop = new Properties()
prop.setProperty("bootstrap.servers",kafkaBrokers)
prop.setProperty("group.id",groupId)
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
//kafka consumer的消费策略设置
kafkaConsumer.setStartFromGroupOffsets()
//指定kafka作为source
import org.apache.flink.api.scala._
val text = env.addSource(kafkaConsumer)
//解析json数据中的核心字段
val tupStream = text.map(line => {
val jsonObj = JSON.parseObject(line)
val desc = jsonObj.getString("desc")
val followerUid = jsonObj.getString("followeruid")
val followUid = jsonObj.getString("followuid")
(desc, followerUid, followUid)
})
//使用Neo4jSink维护粉丝关注数据
val param = Map("boltUrl"->boltUrl,"userName"->userName,"passWord"->passWord)
tupStream.addSink(new Neo4jSink(param))
env.execute(appName)
}
}
7、打包配置
对项目打jar包
在pom.xml中添加编译打包配置
这里面的scala版本指定的是2.12版本
注意:flink官方建议把所有依赖都打进一个jar包,所以我们在这就把依赖打进一个jar包里面。
在flink1.11的时候新增了一个特性,可以支持动态指定依赖的jar包,但是我测试了还是有bug,所以在这我们就只能把依赖都打进jar包里面,其实我内心是拒绝的。
注意:针对log4j,flink相关的依赖在打包的时候不需要打进去,所以需要添加provided属性。
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
8、打包
打jar包
D:\IdeaProjects\db_video_recommend_v2\real_time_follow>mvn clean package -DskipTests
[INFO] Scanning for projects...
......
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ real_time_follow ---
[INFO] Building jar: [INFO] Building jar: D:\IdeaProjects\db_video_recommend_v2\real_time_follow\target\real_time_follow-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-assembly-plugin:2.6:single (make-assembly) @ real_time_follow ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend_v2\real_time_follow\target\real_time_follow-1.0-SNAPSHOT-jar-with-dependencies.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.077s
[INFO] Final Memory: 15M/491M
[INFO] ------------------------------------------------------------------------
此时我们就需要使用这个带有jar-with-dependencies的jar包了
real_time_follow-1.0-SNAPSHOT-jar-with-dependencies.jar
9、开发脚本
开发任务提交脚本
startRealTimeFollow.sh
#!/bin/bash
masterUrl="yarn-cluster"
appName="RealTimeFollowScala"
kafkaBrokers="bigdata01:9092,bigdata02:9092,bigdata03:9092"
groupId="con_2"
topic="user_follow"
boltUrl="bolt://bigdata04:7687"
userName="neo4j"
passWord="admin"
#注意:需要将flink脚本路径配置到Linux的环境变量中
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.imooc.flink.RealTimeFollowScala \
/data/soft/video_recommend_v2/jobs/real_time_follow-1.0-SNAPSHOT-jar-with-dependencies.jar ${appName} ${kafkaBrokers} ${groupId} ${topic} ${boltUrl} ${userName} ${passWord}
在bigdata04上创建目录:video_recommend_v2和jobs目录
[root@bigdata04 soft]# mkdir -p video_recommend_v2/jobs
10、上传脚本
将打好的jar包和脚本上传到jobs目录中
[root@bigdata04 jobs]# ll
-rw-r--r--. 1 root root 80993088 Sep 6 2020 real_time_follow-1.0-SNAPSHOT-jar-with-dependencies.jar
-rw-r--r--. 1 root root 616 Feb 21 05:20 startRealTimeFollow.sh
11、检查环境变量
注意:在执行之前需要配置flink的环境变量,FLINK_HOME
[root@bigdata04 jobs]# vi /etc/profile
......
export JAVA_HOME=/data/soft/jdk1.8
export HADOOP_HOME=/data/soft/hadoop-3.2.0
export HIVE_HOME=/data/soft/apache-hive-3.1.2-bin
export SPARK_HOME=/data/soft/spark-2.4.3-bin-hadoop2.7
export SQOOP_HOME=/data/soft/sqoop-1.4.7.bin__hadoop-2.6.0
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
export FLINK_HOME=/data/soft/flink-1.11.1
export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HIVE_HOME/bin:$SPARK_HO
ME/bin:$SQOOP_HOME/bin:$FLINK_HOME/bin:$PATH
[root@bigdata04 jobs]# source /etc/profile
12、开始提交任务
向集群提交任务
[root@bigdata04 jobs]# sh -x startRealTimeFollow.sh
通过kafka的console控制台生产者,模拟产生数据,到neo4j中确认效果,发现是没有问题的。
注意:此时是存在数据乱序的问题的,前面在讲Flink的时候我们详细讲解过Flink中的乱序处理方案,在这里给大家留一个作业,对这个代码进行改造,解决数据乱序问题。