使用flink-cdc技术(2.3.0)解析binlog实现实时数据大屏。
一、项目环境
该项目主要为一个数据大屏,采用了flink-cdc技术(2.3.0),flink版本为(1.14.6),利用他,实现自动获取并解析Mysql的binlog,来实时把对应的数据解析出来用于大屏展示。
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.14.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
该文章主要参考了,我自己的https://blog.51cto.com/lenglingx/11242672【十一分钟上手Flink CDC】
同样这篇就是“使用mysql-binlog-connector-java技术(0.29.2)解析binlog实现实时数据大屏”的Flink-CDC的技术实现。
同样使用了多线程来,不过线程池是放到了程序代码了。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cqsym</groupId>
<artifactId>newbig</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.6.15</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!--lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
<!-- optional=true,依赖不会传递,该项目依赖devtools;之后依赖myboot项目的项目如果想要使用devtools,需要重新引入 -->
<scope>true</scope>
</dependency>
<!-- jackson-datatype-jsr310 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-jdbc -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-jpa -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.14.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
<build>
<finalName>newbig</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.6.15</version>
<configuration>
<fork>true</fork>
<includeSystemScope>true</includeSystemScope>
<!--fork : 如果没有该项配置,肯呢个devtools不会起作用,即应用不会restart -->
<!--这里写上main方法所在类的路径-->
<mainClass>com.cqsym.newbig.Application</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.3.1</version>
<configuration>
<encoding>UTF-8</encoding>
<!-- 过滤后缀为pem、pfx的证书文件 -->
<nonFilteredFileExtensions>
<nonFilteredFileExtension>p12</nonFilteredFileExtension>
<nonFilteredFileExtension>cer</nonFilteredFileExtension>
<nonFilteredFileExtension>pem</nonFilteredFileExtension>
<nonFilteredFileExtension>pfx</nonFilteredFileExtension>
</nonFilteredFileExtensions>
</configuration>
</plugin>
</plugins>
</build>
</project>
application-dev.properties
#-----------------自定义信息配置---------------------
com.imddy.layuiadmin.title=BOOT
com.imddy.layuiadmin.description=学习一下BOOT是什么
#-----------------自定义信息配置---------------------
executeTask=2
#----------数据库基础配置--------------------
#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/layuiadmin2?useUnicode=true&useSSL=false&characterEncoding=UTF-8&serverTimezone=UTC
#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/layuiadmin2?useUnicode=true&useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull
#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/twmshelp?useUnicode=true&useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
#spring.datasource.username=root
#spring.datasource.password=root__
#spring.datasource.name=HikaraPool-100
#----------数据库连接池基础配置--------------------
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
# 指定连接池的名称 - 默认自动生成
spring.datasource.hikari.pool-name=HikaraPool-1
# 如果你的驱动程序支持JDBC4,强烈建议不要设置此属性。
#spring.datasource.hikari.connection-test-query=select 1
# 连接超时时间 - 默认值:30秒。
spring.datasource.hikari.connection-timeout=30000
# 连接池中允许闲置的最长时间 - 默认值:10分钟
spring.datasource.hikari.idle-timeout=600000
# 一个连接生命时长(毫秒),超时而没被使用则被释放 - 默认值:30分钟
spring.datasource.hikari.max-lifetime=1800000
# 连接池中允许的最大连接数,包括闲置和使用中的连接 - 默认值:10
spring.datasource.hikari.maximum-pool-size=100
# 连接池中允许的最小空闲连接数 - 默认值:10。
spring.datasource.hikari.minimum-idle=10
# 连接被测试活动的最长时间 - 默认值:5秒。
spring.datasource.hikari.validation-timeout=5000
spring.jpa.show-sql=true
spring.jpa.open-in-view=false
spring.datasource.primary.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.primary.url=jdbc:mysql://127.0.0.1:3306/newbig?useUnicode=true&useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.primary.username=root
spring.datasource.primary.password=root__
# 指定为HikariDataSource
spring.datasource.primary.type=com.zaxxer.hikari.HikariDataSource
# hikari连接池配置 对应 HikariConfig 配置属性类
spring.datasource.primary.hikari.pool-name=HikariCP-Primary
#最小空闲连接数
spring.datasource.primary.hikari.minimum-idle=5
# 空闲连接存活最大时间,默认10分钟
spring.datasource.primary.hikari.idle-timeout=600000
# 连接池最大连接数,默认是10
spring.datasource.primary.hikari.maximum-pool-size=10
# 此属性控制从池返回的连接的默认自动提交行为,默认值:true
spring.datasource.primary.hikari.auto-commit=true
# 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认30分钟
spring.datasource.primary.hikari.max-lifetime=1800000
# 数据库连接超时时间,默认30秒
spring.datasource.primary.hikari.connection-timeout=30000
# 连接测试query,如果你的驱动程序支持JDBC4,强烈建议不要设置此属性。
#spring.datasource.primary.hikari.connection-test-query=SELECT 1
spring.datasource.second.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.second.url=jdbc:mysql://192.168.203.150:3306/twms?useUnicode=true&useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.second.username=aliyun_root
spring.datasource.second.password=root__
spring.datasource.second.host=192.168.203.150
spring.datasource.second.port=3306
# 指定为HikariDataSource
spring.datasource.second.type=com.zaxxer.hikari.HikariDataSource
# hikari连接池配置 对应 HikariConfig 配置属性类
spring.datasource.second.hikari.pool-name=HikariCP-Second
#最小空闲连接数
spring.datasource.second.hikari.minimum-idle=5
# 空闲连接存活最大时间,默认10分钟
spring.datasource.second.hikari.idle-timeout=600000
# 连接池最大连接数,默认是10
spring.datasource.second.hikari.maximum-pool-size=10
# 此属性控制从池返回的连接的默认自动提交行为,默认值:true
spring.datasource.second.hikari.auto-commit=true
# 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认30分钟
spring.datasource.second.hikari.max-lifetime=1800000
# 数据库连接超时时间,默认30秒
spring.datasource.second.hikari.connection-timeout=30000
# 连接测试query,如果你的驱动程序支持JDBC4,强烈建议不要设置此属性。
#spring.datasource.second.hikari.connection-test-query=SELECT 1
## redis 配置
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0
spring.redis.timeout=10s
spring.redis.lettuce.pool.min-idle=0
spring.redis.lettuce.pool.max-idle=8
spring.redis.lettuce.pool.max-active=8
spring.redis.lettuce.pool.max-wait=-1ms
这里配置了redis,但实际项目中没有使用。
NbigscreenController.java
package com.cqsym.newbig.controller;
import com.cqsym.newbig.primary.entity.TownerBase;
import com.cqsym.newbig.service.BinLogService;
import com.cqsym.newbig.service.TownerBaseService;
import com.cqsym.newbig.service.XianluMingchengService;
import com.cqsym.newbig.service.ZaituDingdanNumSerivce;
import com.cqsym.newbig.service.ZuoyeNumService;
import com.cqsym.newbig.utils.AjaxResult;
import com.cqsym.newbig.vo.DaquNumVo;
import com.cqsym.newbig.vo.OwnerBaseNumVo;
import com.cqsym.newbig.vo.XianluMingchengVo;
import com.cqsym.newbig.vo.ZaituDingdanVo;
import com.cqsym.newbig.vo.ZhanbiVo;
import com.cqsym.newbig.vo.ZuoyeShishiQingkuangVo;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.List;
import java.util.Map;
@Controller
@RequestMapping("/nbigscreen")
public class NbigscreenController {
private static final Logger log = LoggerFactory.getLogger(NbigscreenController.class);
@Autowired
private BinLogService binLogService;
@Autowired
private ZuoyeNumService zuoyeNumService;
@Autowired
private XianluMingchengService xianluMingchengService;
@Autowired
private ZaituDingdanNumSerivce zaituDingdanNumSerivce;
@Autowired
private TownerBaseService townerBaseService;
@ResponseBody
@RequestMapping("/index")
public AjaxResult index() {
log.info("index... ");
return AjaxResult.success("index");
}
@ResponseBody
@RequestMapping("/on")
public AjaxResult on() {
log.info("on... ");
try {
binLogService.start();
} catch (Exception exception) {
return AjaxResult.error(exception.getMessage());
}
return AjaxResult.success();
}
@ResponseBody
@RequestMapping("/off")
public AjaxResult off() {
log.info("off... ");
try {
binLogService.stop();
} catch (Exception exception) {
return AjaxResult.error(exception.getMessage());
}
return AjaxResult.success();
}
@ResponseBody
@RequestMapping("/getZuoyeNum")
public AjaxResult getZuoyeNum() {
log.info("getZuoyeNum... ");
Map map = zuoyeNumService.getZuoyeNum();
return AjaxResult.success(map);
}
@ResponseBody
@RequestMapping("/getBanshichuZuoyeNum")
public AjaxResult getBanshichuZuoyeNum() {
log.info("getBanshichuZuoyeNum... ");
List<OwnerBaseNumVo> list = zuoyeNumService.getBanshichuZuoyeNum();
return AjaxResult.success(list);
}
@ResponseBody
@RequestMapping("/getDaquZuoyeNum")
public AjaxResult getDaquZuoyeNum() {
log.info("getDaquZuoyeNum... ");
List<DaquNumVo> list = zuoyeNumService.getDaquZuoyeNum();
return AjaxResult.success(list);
}
@ResponseBody
@RequestMapping("/getShishiZuoyeQingkuang")
public AjaxResult getShishiZuoyeQingkuang() {
log.info("getShishiZuoyeQingkuang... ");
List<ZuoyeShishiQingkuangVo> list = zuoyeNumService.getShishiZuoyeQingkuang();
return AjaxResult.success(list);
}
@ResponseBody
@RequestMapping("/getXianluMingcheng")
public AjaxResult getXianluMingcheng() {
log.info("getXianluMingcheng... ");
List<XianluMingchengVo> list = xianluMingchengService.getXianluMingcheng();
return AjaxResult.success(list);
}
@ResponseBody
@RequestMapping("/getZaituDingdanNumForEveryDay")
public AjaxResult getZaituDingdanNumForEveryDay() throws JsonProcessingException {
log.info("getZaituDingdanNumForEveryDay... ");
List<ZaituDingdanVo> list = zaituDingdanNumSerivce.getZaituDingdanNumForEveryDay();
return AjaxResult.success(list);
}
@ResponseBody
@RequestMapping("/getTOwnerBaseAll")
public AjaxResult getTOwnerBaseAll() {
log.info("getTOwnerBaseAll... ");
List<TownerBase> list = townerBaseService.findAll();
return AjaxResult.success(list);
}
@ResponseBody
@RequestMapping("/getZuoyeZhanbi")
public AjaxResult getZuoyeZhanbi() {
log.info("getZuoyeZhanbi... ");
List<ZhanbiVo> list = zuoyeNumService.getZuoyeZhanbi();
return AjaxResult.success(list);
}
}