0
点赞
收藏
分享

微信扫一扫

Flink的api入门案例

松鼠树屋 2022-04-21 阅读 62

目录

1 批处理api开发(scala语言开发,记得创建一个maven工程之后就添加scala语言)

2 java的批处理

 3 scala的流式编程

4  java流式编程的代码

5 source (基本版本)

 6 mysq作为一个source

 7 kafka作为source

7.1 老版本的kafka的api

7.2 新版本的kafka基础版(就是偏移量就没自定义)

 7.3 新版本的升级版本,指定偏移量


环境:三台虚拟机 (qianfeng01:8081,qianfeng02:8081,qianfeng03:8081)

idea开发

pom.xml(这个版本号就根据自己机器的版本号修改,不同版本真的存在兼容性问题)

1 批处理api开发(scala语言开发,记得创建一个maven工程之后就添加scala语言)

就是在工程的src文件夹同级建一个data文件夹,里面自定义一个文件,写单词

下图就是我的test.txt

 变形

就是去掉打印控制台,添加并行度,以及保存文件和执行

变形(把死的文件路径改成活的args)

传参的设置

 

2 java的批处理

 变形

 3 scala的流式编程

 程序边执行的时候,要在虚拟机上敲一下nc -l 8888

4  java流式编程的代码

 打包上传到虚拟机里面

然后编写一个词汇的文档

在虚拟机中的运行代码(我这边有错误,别跟着我)

5 source (基本版本)

 对于source的一个写法,有两种

 6 mysq作为一个source

我的mysql装在qianfeng03上面了。

 在数据库里面建表 插数据

 7 kafka作为source

7.1 老版本的kafka的api

打开虚拟机

因为我三台机器都开了,所以kafka服务端就开了三个ip

虚拟机

 

上面就是kafka生产者

先运行一下idea上面的程序

然后在虚拟机的生产者后面敲内容

这个程序会一直运行的哦~

现在敲新代码(初级版本,就是敲了一个kafka的端口,组名,序列化,指定位置,还能跑哈哈哈)

注意flink和kafka的api默认那个watermark是开了的,但是这个项目我们要关掉~

7.2 新版本的kafka基础版(就是偏移量就没自定义)

 

 7.3 新版本的升级版本,指定偏移量

举报

相关推荐

0 条评论