0
点赞
收藏
分享

微信扫一扫

Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch

文章目录

简介

本文介绍了通过 Flink CDC + Flink SQL 同步 MySQL 数据到 ElasticSearch 的案例。案例包含了 Insert/Update/Delete 的操作。

系统环境和软件版本

Ubuntu 20.04
JDK 1.8
Maven 3.6.3
Flink 1.13.6
ElasticSearch 7.16.2

MySQL 测试数据准备


mysql> CREATE DATABASE mydb;

mysql> USE mydb;

mysql> CREATE TABLE products (
       id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
       name VARCHAR(255) NOT NULL,
       description VARCHAR(512)
     );

mysql> INSERT INTO products VALUES (default,"scooter1","Small 1-wheel scooter");
Query OK, 1 row affected (0.01 sec)

ElasticSearch 安装

1. 安装包选择和下载

官网下载地址:
https://www.elastic.co/cn/downloads/past-releases/elasticsearch-7-16-2
在这里插入图片描述
根据自己的操作系统(和芯片)选择一个合适的安装包。苹果M1芯片或者在苹果M1芯片安装的虚拟机都是选择后缀ARRCH64的安装包。
笔者当前的系统环境是基于苹果M1芯片安装Ubuntu 20.04操作系统,因此选择了 LINUX ARRCH64 的安装包。

下载命令

axel -n 20 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.2-linux-aarch64.tar.gz

2. 解压

tar xvf elasticsearch-7.16.2-linux-aarch64.tar.gz

3. 启动 ElasticeSearch

cd elasticsearch-7.16.2
bin/elasticsearch

4. 验证是否启动成功

curl http://localhost:9200

如下图所示,说明启动成功了
在这里插入图片描述

Flink 集群准备

1. 下载 flink 1.13.6 的二进制安装包

axel -n 20 https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz

2. 解压

tar xvf flink-1.13.6-bin-scala_2.11.tgz

3. 将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由 Flink CDC 源码编译得到

cp /opt/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar /opt/flink-1.13.6/lib

如何通过 Flink CDC 源码编译得到 flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar,请参考:
《Flink CDC 系列(2)—— Flink CDC 源码编译》

4. 修改 /opt/flink-1.13.6/conf/workers

vi /opt/flink-1.13.6/conf/workers

workers文件内容:

localhost
localhost

意思是要在本机启动两个work进程

5. 修改 /opt/flink-1.13.6/conf/flink-conf.yaml

vi  /opt/flink-1.13.6/conf/flink-conf.yaml

设置参数: taskmanager.numberOfTaskSlots: 2

6. 下载 flink hadoop uber jar 文件
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar, 文件拷贝到 /opt/flink-1.13.6/lib 目录下

在这里插入图片描述

7. 下载 flink elasticsearch connector jar 文件
flink-sql-connector-elasticsearch7_2.11-1.13.6.jar
,文件拷贝到 /opt/flink-1.13.6/lib 目录下
在这里插入图片描述

8. 启动单机集群

cd /opt/flink-1.13.6
bin/start-cluster.sh

9. 查看 jobmanager 和 taskmanager 的进程是否存活

$ jps -m
9824 Jps -m
9143 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
8875 StandaloneSessionClusterEntrypoint --configDir /opt/flink-1.13.6/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b
9403 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
9727 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
ubuntu@ubuntu:/opt/flink-1.13.6$

演示开始

1. 启动 Flink SQL Client

cd /opt/flink-1.13.6
bin/sql-client.sh

2. 在 Flink SQL Client 中执行 DDL 和 查询

-- 创建 mysql-cdc source 
Flink SQL> CREATE TABLE products (
     id INT,
     name STRING,
     description STRING,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = '192.168.64.6',
     'port' = '3306',
     'username' = 'test',
     'password' = 'test',
     'database-name' = 'mydb',
     'table-name' = 'products'
   );
[INFO] Execute statement succeed.
Flink SQL> select * from products;
id                 name                 description
1                 scooter1          Small 1-wheel scooter

Flink SQL> CREATE TABLE products_es_sink (
     id INT,
     name STRING,
     description STRING,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'products'
   );
[INFO] Execute statement succeed.

Flink SQL> insert into products_es_sink select * from products;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b962baa7f6a8890cc45e43a7c95765d2

3. 查看 Elasticearch Index 的数据

curl http://localhost:9200/products/_search?pretty
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "name" : "scooter1",
          "description" : "Small 1-wheel scooter"
        }
      }
    ]
  }
}

4. 在Mysql客户端插入新的数据

mysql> INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");

5. 查看 Elasticearch Index 的数据
在命令行执行:

curl http://localhost:9200/products/_search?pretty
{
  "took" : 433,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "name" : "scooter1",
          "description" : "Small 1-wheel scooter"
        }
      },
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "name" : "scooter2",
          "description" : "Small 2-wheel scooter"
        }
      }
    ]
  }
}
-- 新数据写到了elasticsearch

6. 在Mysql客户端更新的数据

mysql> update products set name = 'scooter----1' where id = 1;

7. 查看 Elasticearch Index 的数据
在命令行执行:

curl http://localhost:9200/products/_search?pretty
{
  "took" : 154,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "name" : "scooter2",
          "description" : "Small 2-wheel scooter"
        }
      },
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "name" : "scooter----1",
          "description" : "Small 1-wheel scooter"
        }
      }
    ]
  }
}
-- id=1的数据被更新到了elasticsearch

7. 在Mysql客户端删除的数据

mysql> delete from products where id  = 1;

8. 查看 Elasticearch Index 的数据

在命令行执行:

curl http://localhost:9200/products/_search?pretty
{
  "took" : 347,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "name" : "scooter2",
          "description" : "Small 2-wheel scooter"
        }
      }
    ]
  }
}

-- id=1的数据被删除

总结

通过 Flink CDC 可以捕获到 MySQL 的 insert/update/delete 操作日志,并通过 Flink SQL 可对 ElasticSearch 的索引数据进行 insert/update/delete。

举报

相关推荐

0 条评论