目录
1 批处理api开发(scala语言开发,记得创建一个maven工程之后就添加scala语言)
环境:三台虚拟机 (qianfeng01:8081,qianfeng02:8081,qianfeng03:8081)
idea开发
pom.xml(这个版本号就根据自己机器的版本号修改,不同版本真的存在兼容性问题)
1 批处理api开发(scala语言开发,记得创建一个maven工程之后就添加scala语言)
就是在工程的src文件夹同级建一个data文件夹,里面自定义一个文件,写单词
下图就是我的test.txt
变形
就是去掉打印控制台,添加并行度,以及保存文件和执行
变形(把死的文件路径改成活的args)
传参的设置
2 java的批处理
变形
3 scala的流式编程
程序边执行的时候,要在虚拟机上敲一下nc -l 8888
4 java流式编程的代码
打包上传到虚拟机里面
然后编写一个词汇的文档
在虚拟机中的运行代码(我这边有错误,别跟着我)
5 source (基本版本)
对于source的一个写法,有两种
6 mysq作为一个source
我的mysql装在qianfeng03上面了。
在数据库里面建表 插数据
7 kafka作为source
7.1 老版本的kafka的api
打开虚拟机
因为我三台机器都开了,所以kafka服务端就开了三个ip
虚拟机
上面就是kafka生产者
先运行一下idea上面的程序
然后在虚拟机的生产者后面敲内容
这个程序会一直运行的哦~
现在敲新代码(初级版本,就是敲了一个kafka的端口,组名,序列化,指定位置,还能跑哈哈哈)
注意flink和kafka的api默认那个watermark是开了的,但是这个项目我们要关掉~
7.2 新版本的kafka基础版(就是偏移量就没自定义)