0
点赞
收藏
分享

微信扫一扫

dinky-binlog-kafka-flinksql流程处理

准确阶段:

mysql:

开启mysql日志

kafka:

需检查服务是否正常

maxwell:

这里采用19版本,过新的版本对java版本要求高,我这里是java8

maxwell-1.19.0

maxwell操作:

cd /root/tar_temp/maxwell-1.19.0
bin/maxwell --user='root' --password='1017~Fulin' --port=3306 --host='172.18.1.5' --producer=kafka --kafka.bootstrap.servers=172.18.1.16:9092 --kafka_topic=test01 --filter="exclude:*.*, include:shufu_tag.t_call_record"

运行成功后,在相应的mysql中执行insert操作

然后运行kafka-consumer

cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka
./bin/kafka-console-consumer.sh --bootstrap-server 172.18.1.16:9092 --topic test01 --from-beginning

出现

 dinky-binlog-kafka-flinksql流程处理_bootstrap

则成功。

flink

这里引入了dinky平台,需要在plugins中添加kafka -connect、kafka-client包,然后重启执行代码

CREATE TABLE kafka_input (
`database` string,
`table` string,
`type` string,
ts_get string,
xid_get string,
commit_get string,
-- title ROW<id INT, name STRING, level INT>,
`data` ROW<id_get string,plan_detail_code string,task_code string,callee_phone string,record_id string,record_status string,tag string,staff_tag string,duration string,start_time string,answer_time string,end_time string,intervene_time string,call_media_url string,remark string,created_time string,call_round string,finish_call_times string,custom_guide string,update_time string,visit_guide string,call_stage integer>
) WITH (
'connector' = 'kafka',
'topic' = 'test01',
'properties.bootstrap.servers' = '172.18.1.16:9092',
'properties.group.id' = 'for_source_test',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'false'
);

select `database`,`table`,`type`,`data`.plan_detail_code from kafka_input;


运行成功!

举报

相关推荐

0 条评论