0
点赞
收藏
分享

微信扫一扫

使用flink-cdc技术(2.3.0)解析binlog实现实时数据大屏

霸姨 2024-07-26 阅读 22

使用flink-cdc技术(2.3.0)解析binlog实现实时数据大屏。


一、项目环境

该项目主要为一个数据大屏,采用了flink-cdc技术(2.3.0),flink版本为(1.14.6),利用他,实现自动获取并解析Mysql的binlog,来实时把对应的数据解析出来用于大屏展示。

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.14.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>

该文章主要参考了,我自己的https://blog.51cto.com/lenglingx/11242672【十一分钟上手Flink CDC

同样这篇就是“使用mysql-binlog-connector-java技术(0.29.2)解析binlog实现实时数据大屏”的Flink-CDC的技术实现。

同样使用了多线程来,不过线程池是放到了程序代码了。


pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cqsym</groupId>
    <artifactId>newbig</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.6.15</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!--lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-json</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
            <!-- optional=true,依赖不会传递,该项目依赖devtools;之后依赖myboot项目的项目如果想要使用devtools,需要重新引入 -->
            <scope>true</scope>
        </dependency>
        <!-- jackson-datatype-jsr310 -->
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-jdbc -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-jpa -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.30</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.14.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>1.14.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime_2.12</artifactId>
            <version>1.14.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.14.6</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>32.1.3-jre</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>newbig</finalName>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.6.15</version>
                <configuration>
                    <fork>true</fork>
                    <includeSystemScope>true</includeSystemScope>
                    <!--fork : 如果没有该项配置,肯呢个devtools不会起作用,即应用不会restart -->
                    <!--这里写上main方法所在类的路径-->
                    <mainClass>com.cqsym.newbig.Application</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>3.3.1</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <!-- 过滤后缀为pem、pfx的证书文件 -->
                    <nonFilteredFileExtensions>
                        <nonFilteredFileExtension>p12</nonFilteredFileExtension>
                        <nonFilteredFileExtension>cer</nonFilteredFileExtension>
                        <nonFilteredFileExtension>pem</nonFilteredFileExtension>
                        <nonFilteredFileExtension>pfx</nonFilteredFileExtension>
                    </nonFilteredFileExtensions>
                </configuration>
            </plugin>
        </plugins>

    </build>
</project>

application-dev.properties

#-----------------自定义信息配置---------------------
com.imddy.layuiadmin.title=BOOT
com.imddy.layuiadmin.description=学习一下BOOT是什么
#-----------------自定义信息配置---------------------
executeTask=2

#----------数据库基础配置--------------------
#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/layuiadmin2?useUnicode=true&useSSL=false&characterEncoding=UTF-8&serverTimezone=UTC
#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/layuiadmin2?useUnicode=true&useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull
#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/twmshelp?useUnicode=true&useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
#spring.datasource.username=root
#spring.datasource.password=root__
#spring.datasource.name=HikaraPool-100
#----------数据库连接池基础配置--------------------
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
# 指定连接池的名称 - 默认自动生成
spring.datasource.hikari.pool-name=HikaraPool-1
# 如果你的驱动程序支持JDBC4,强烈建议不要设置此属性。
#spring.datasource.hikari.connection-test-query=select 1
# 连接超时时间 - 默认值:30秒。
spring.datasource.hikari.connection-timeout=30000
# 连接池中允许闲置的最长时间 - 默认值:10分钟
spring.datasource.hikari.idle-timeout=600000
# 一个连接生命时长(毫秒),超时而没被使用则被释放 - 默认值:30分钟
spring.datasource.hikari.max-lifetime=1800000
# 连接池中允许的最大连接数,包括闲置和使用中的连接 - 默认值:10
spring.datasource.hikari.maximum-pool-size=100
# 连接池中允许的最小空闲连接数 - 默认值:10。
spring.datasource.hikari.minimum-idle=10
# 连接被测试活动的最长时间 - 默认值:5秒。
spring.datasource.hikari.validation-timeout=5000

spring.jpa.show-sql=true
spring.jpa.open-in-view=false

spring.datasource.primary.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.primary.url=jdbc:mysql://127.0.0.1:3306/newbig?useUnicode=true&useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.primary.username=root
spring.datasource.primary.password=root__
# 指定为HikariDataSource
spring.datasource.primary.type=com.zaxxer.hikari.HikariDataSource
# hikari连接池配置 对应 HikariConfig 配置属性类
spring.datasource.primary.hikari.pool-name=HikariCP-Primary
#最小空闲连接数
spring.datasource.primary.hikari.minimum-idle=5
# 空闲连接存活最大时间,默认10分钟
spring.datasource.primary.hikari.idle-timeout=600000
# 连接池最大连接数,默认是10
spring.datasource.primary.hikari.maximum-pool-size=10
# 此属性控制从池返回的连接的默认自动提交行为,默认值:true
spring.datasource.primary.hikari.auto-commit=true
# 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认30分钟
spring.datasource.primary.hikari.max-lifetime=1800000
# 数据库连接超时时间,默认30秒
spring.datasource.primary.hikari.connection-timeout=30000
# 连接测试query,如果你的驱动程序支持JDBC4,强烈建议不要设置此属性。
#spring.datasource.primary.hikari.connection-test-query=SELECT 1

spring.datasource.second.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.second.url=jdbc:mysql://192.168.203.150:3306/twms?useUnicode=true&useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.second.username=aliyun_root
spring.datasource.second.password=root__
spring.datasource.second.host=192.168.203.150
spring.datasource.second.port=3306
# 指定为HikariDataSource
spring.datasource.second.type=com.zaxxer.hikari.HikariDataSource
# hikari连接池配置 对应 HikariConfig 配置属性类
spring.datasource.second.hikari.pool-name=HikariCP-Second
#最小空闲连接数
spring.datasource.second.hikari.minimum-idle=5
# 空闲连接存活最大时间,默认10分钟
spring.datasource.second.hikari.idle-timeout=600000
# 连接池最大连接数,默认是10
spring.datasource.second.hikari.maximum-pool-size=10
# 此属性控制从池返回的连接的默认自动提交行为,默认值:true
spring.datasource.second.hikari.auto-commit=true
# 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认30分钟
spring.datasource.second.hikari.max-lifetime=1800000
# 数据库连接超时时间,默认30秒
spring.datasource.second.hikari.connection-timeout=30000
# 连接测试query,如果你的驱动程序支持JDBC4,强烈建议不要设置此属性。
#spring.datasource.second.hikari.connection-test-query=SELECT 1



## redis 配置
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0
spring.redis.timeout=10s
spring.redis.lettuce.pool.min-idle=0
spring.redis.lettuce.pool.max-idle=8
spring.redis.lettuce.pool.max-active=8
spring.redis.lettuce.pool.max-wait=-1ms

这里配置了redis,但实际项目中没有使用。


NbigscreenController.java

package com.cqsym.newbig.controller;


import com.cqsym.newbig.primary.entity.TownerBase;
import com.cqsym.newbig.service.BinLogService;
import com.cqsym.newbig.service.TownerBaseService;
import com.cqsym.newbig.service.XianluMingchengService;
import com.cqsym.newbig.service.ZaituDingdanNumSerivce;
import com.cqsym.newbig.service.ZuoyeNumService;
import com.cqsym.newbig.utils.AjaxResult;
import com.cqsym.newbig.vo.DaquNumVo;
import com.cqsym.newbig.vo.OwnerBaseNumVo;
import com.cqsym.newbig.vo.XianluMingchengVo;
import com.cqsym.newbig.vo.ZaituDingdanVo;
import com.cqsym.newbig.vo.ZhanbiVo;
import com.cqsym.newbig.vo.ZuoyeShishiQingkuangVo;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.List;
import java.util.Map;

@Controller
@RequestMapping("/nbigscreen")
public class NbigscreenController {
    private static final Logger log = LoggerFactory.getLogger(NbigscreenController.class);

    @Autowired
    private BinLogService binLogService;
    @Autowired
    private ZuoyeNumService zuoyeNumService;
    @Autowired
    private XianluMingchengService xianluMingchengService;
    @Autowired
    private ZaituDingdanNumSerivce zaituDingdanNumSerivce;
    @Autowired
    private TownerBaseService townerBaseService;




    @ResponseBody
    @RequestMapping("/index")
    public AjaxResult index() {
        log.info("index... ");
        return AjaxResult.success("index");
    }


    @ResponseBody
    @RequestMapping("/on")
    public AjaxResult on() {
        log.info("on... ");
        try {
            binLogService.start();
        } catch (Exception exception) {
            return AjaxResult.error(exception.getMessage());
        }
        return AjaxResult.success();
    }


    @ResponseBody
    @RequestMapping("/off")
    public AjaxResult off() {
        log.info("off... ");
        try {
            binLogService.stop();
        } catch (Exception exception) {
            return AjaxResult.error(exception.getMessage());
        }
        return AjaxResult.success();
    }


    @ResponseBody
    @RequestMapping("/getZuoyeNum")
    public AjaxResult getZuoyeNum() {
        log.info("getZuoyeNum... ");
        Map map = zuoyeNumService.getZuoyeNum();
        return AjaxResult.success(map);
    }


    @ResponseBody
    @RequestMapping("/getBanshichuZuoyeNum")
    public AjaxResult getBanshichuZuoyeNum() {
        log.info("getBanshichuZuoyeNum... ");
        List<OwnerBaseNumVo> list = zuoyeNumService.getBanshichuZuoyeNum();
        return AjaxResult.success(list);
    }


    @ResponseBody
    @RequestMapping("/getDaquZuoyeNum")
    public AjaxResult getDaquZuoyeNum() {
        log.info("getDaquZuoyeNum... ");
        List<DaquNumVo> list = zuoyeNumService.getDaquZuoyeNum();
        return AjaxResult.success(list);
    }


    @ResponseBody
    @RequestMapping("/getShishiZuoyeQingkuang")
    public AjaxResult getShishiZuoyeQingkuang() {
        log.info("getShishiZuoyeQingkuang... ");
        List<ZuoyeShishiQingkuangVo> list = zuoyeNumService.getShishiZuoyeQingkuang();
        return AjaxResult.success(list);
    }


    @ResponseBody
    @RequestMapping("/getXianluMingcheng")
    public AjaxResult getXianluMingcheng() {
        log.info("getXianluMingcheng... ");
        List<XianluMingchengVo> list = xianluMingchengService.getXianluMingcheng();
        return AjaxResult.success(list);
    }


    @ResponseBody
    @RequestMapping("/getZaituDingdanNumForEveryDay")
    public AjaxResult getZaituDingdanNumForEveryDay() throws JsonProcessingException {
        log.info("getZaituDingdanNumForEveryDay... ");
        List<ZaituDingdanVo>  list = zaituDingdanNumSerivce.getZaituDingdanNumForEveryDay();
        return AjaxResult.success(list);
    }


    @ResponseBody
    @RequestMapping("/getTOwnerBaseAll")
    public AjaxResult getTOwnerBaseAll() {
        log.info("getTOwnerBaseAll... ");
        List<TownerBase>  list = townerBaseService.findAll();
        return AjaxResult.success(list);
    }

    @ResponseBody
    @RequestMapping("/getZuoyeZhanbi")
    public AjaxResult getZuoyeZhanbi() {
        log.info("getZuoyeZhanbi... ");
        List<ZhanbiVo> list = zuoyeNumService.getZuoyeZhanbi();
        return AjaxResult.success(list);
    }




}


二、各种Service


举报

相关推荐

0 条评论