0
点赞
收藏
分享

微信扫一扫

Flink教程(06)- Flink批流一体API(Source示例)



文章目录

  • 01 引言
  • 02 Source
  • 2.1 基于集合的Source
  • 2.2 基于文件的Source
  • 2.3 基于Socket的Source
  • 2.4 自定义Source
  • 2.4.1 案例 - 随机生成数据
  • 2.4.2 案例 - MySQL
  • 03 文末

01 引言

在前面的博客,我们已经对Flink的原理有了一定的了解了,有兴趣的同学可以参阅下:

  • 《Flink教程(01)- Flink知识图谱》
  • 《Flink教程(02)- Flink入门》
  • 《Flink教程(03)- Flink环境搭建》
  • 《Flink教程(04)- Flink入门案例》
  • 《Flink教程(05)- Flink原理简单分析》

本文开始讲解​​Flink​​​程序模型对应的代码,也就是​​Flink​​​批流一体对应的​​API​​​,分别对应为:​​Source​​​ 、​​Transformation​​​、​​Slink​​​,本文讲​​Source​​。

Flink教程(06)- Flink批流一体API(Source示例)_flink

02 Source

​Source​​对应的就是Flink编程模型里面的​​Data Source​​数据源:

Flink教程(06)- Flink批流一体API(Source示例)_hadoop_02

在​​​Flink​​官网​​,我们可以知道​​Source​​有如下几种类型:

Flink教程(06)- Flink批流一体API(Source示例)_hadoop_03

转义为中文即:

  • File-based​:基于文件的的Source
  • Socket-based​: 基于Socket的Source
  • Collection-based​: 基于集合的Source
  • Custom​: 自定义Source

2.1 基于集合的Source

相关API(一般用于学习测试时编造数据时使用):

  • ​env.fromElements​​(可变参数);
  • ​env.fromColletion​​(各种集合);
  • ​env.generateSequence​​(开始,结束);
  • ​env.fromSequence​​(开始,结束)。

示例代码:

/**
* 把本地的普通的Java集合/Scala集合变为分布式的Flink的DataStream集合!
*
* @author : YangLinWei
* @createTime: 2022/3/7 2:55 下午
* <p>
* 1.env.fromElements(可变参数);
* 2.env.fromColletion(各种集合);
* 3.env.generateSequence(开始,结束);
* 4.env.fromSequence(开始,结束);
*/
public class SourceDemo1 {

public static void main(String[] args) throws Exception {

//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2.source
// * 1.env.fromElements(可变参数);
DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
// * 2.env.fromColletion(各种集合);
DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
// * 3.env.generateSequence(开始,结束);
DataStream<Long> ds3 = env.generateSequence(1, 10);
//* 4.env.fromSequence(开始,结束);
DataStream<Long> ds4 = env.fromSequence(1, 10);

//3.Transformation

//4.sink
ds1.print();
ds2.print();
ds3.print();
ds4.print();

//5.execute
env.execute();
}
}

运行结果:

Flink教程(06)- Flink批流一体API(Source示例)_mapreduce_04

2.2 基于文件的Source

相关API(一般用于学习测试):

  • ​env.readTextFile​​(本地/HDFS文件/文件夹/压缩文件)

示例代码:

/**
* env.readTextFile(本地/HDFS文件/文件夹/压缩文件)
*
* @author : YangLinWei
* @createTime: 2022/3/7 2:59 下午
*/
public class SourceDemo2 {

public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2.source
// * 1.env.readTextFile(本地文件/HDFS文件);//压缩文件也可以
DataStream<String> ds1 = env.readTextFile("data/input/words.txt");
DataStream<String> ds2 = env.readTextFile("data/input/dir");
DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");
DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");

//3.Transformation

//4.sink
ds1.print();
ds2.print();
ds3.print();
ds4.print();

//5.execute
env.execute();
}
}

2.3 基于Socket的Source

需求:在​​node1​​​上使用​​nc -lk 9999​​​ 向指定端口发送数据(​​nc​​​是​​netcat​​的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据),如果没有该命令可以下安装:

yum install -y nc

使用​​Flink​​编写流处理应用程序实时统计单词数量,代码如下:

/**
* SocketSource
*
* @author : YangLinWei
* @createTime: 2022/3/7 3:02 下午
*/
public class SourceDemo3 {

public static void main(String[] args) throws Exception {

//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2.source
DataStream<String> linesDS = env.socketTextStream("node1", 9999);

//3.处理数据-transformation
//|_____3.1每一行数据按照空格切分成一个个的单词组成一个集合
DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//value就是一行行的数据
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);//将切割处理的一个个的单词收集起来并返回
}
}
});
//|_____3.2对集合中的每个单词记为1
DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
//value就是进来一个个的单词
return Tuple2.of(value, 1);
}
});
//|_____3.3对数据按照单词(key)进行分组
//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
//|_____3.4对各个组内的数据按照数量(value)进行聚合就是求sum
DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);

//4.输出结果-sink
result.print();

//5.触发执行-execute
env.execute();
}
}

2.4 自定义Source

2.4.1 案例 - 随机生成数据

Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:

  • ​SourceFunction​​:非并行数据源(并行度只能=1)
  • ​RichSourceFunction​​:多功能非并行数据源(并行度只能=1)
  • ​ParallelSourceFunction​​:并行数据源(并行度能够>=1)
  • ​RichParallelSourceFunction​​:多功能并行数据源(并行度能够>=1) ,Kafka数据源使用的就是该接口。

需求:每隔1秒随机生成一条订单信息(订单​​ID​​、用户​​ID​​、订单金额、时间戳)

要求:

  • 随机生成订单​​ID​​​(​​UUID​​)
  • 随机生成用户​​ID​​(0-2)
  • 随机生成订单金额(0-100)
  • 时间戳为当前系统时间

示例代码:

/**
* 自定义Source
*
* @author : YangLinWei
* @createTime: 2022/3/7 3:08 下午
* Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:
* SourceFunction:非并行数据源(并行度只能=1)
* RichSourceFunction:多功能非并行数据源(并行度只能=1)
* ParallelSourceFunction:并行数据源(并行度能够>=1)
* RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)--后续学习的Kafka数据源使用的就是该接口
*/
public class SourceDemo4 {


public static void main(String[] args) throws Exception {

//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2.Source
DataStream<Order> orderDS = env
.addSource(new MyOrderSource())
.setParallelism(2);

//3.Transformation

//4.Sink
orderDS.print();
//5.execute
env.execute();
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private String id;
private Integer userId;
private Integer money;
private Long createTime;
}

public static class MyOrderSource extends RichParallelSourceFunction<Order> {
private Boolean flag = true;

@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (flag) {
Thread.sleep(1000);
String id = UUID.randomUUID().toString();
int userId = random.nextInt(3);
int money = random.nextInt(101);
long createTime = System.currentTimeMillis();
ctx.collect(new Order(id, userId, money, createTime));
}
}

//取消任务/执行cancle命令的时候执行
@Override
public void cancel() {
flag = false;
}
}
}

运行结果如下:

Flink教程(06)- Flink批流一体API(Source示例)_Flink_05

2.4.2 案例 - MySQL

需求​:实际开发中,经常会实时接收一些数据,要和​​MySQL​​​中存储的一些规则进行匹配,那么这时候就可以使用​​Flink​​​自定义数据源从​​MySQL​​中读取数据。

那么现在先完成一个简单的需求:

  • 从​​MySQL​​中实时加载数据;
  • 要求​​MySQL​​中的数据有变化,也能被实时加载出来。

首先准备数据:

CREATE TABLE `t_student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

INSERT INTO `t_student` VALUES ('1', 'jack', '18');
INSERT INTO `t_student` VALUES ('2', 'tom', '19');
INSERT INTO `t_student` VALUES ('3', 'rose', '20');
INSERT INTO `t_student` VALUES ('4', 'tom', '19');
INSERT INTO `t_student` VALUES ('5', 'jack', '18');
INSERT INTO `t_student` VALUES ('6', 'rose', '20');

代码实现如下:

/**
* 简单的需求:
* 从MySQL中实时加载数据
* 要求MySQL中的数据有变化,也能被实时加载出来
*
* @author : YangLinWei
* @createTime: 2022/3/7 3:17 下午
*/
public class SourceDemo5 {

public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//2.Source
DataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);

//3.Transformation
//4.Sink
studentDS.print();

//5.execute
env.execute();
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Student {
private Integer id;
private String name;
private Integer age;
}

public static class MySQLSource extends RichParallelSourceFunction<Student> {
private Connection conn = null;
private PreparedStatement ps = null;

@Override
public void open(Configuration parameters) throws Exception {
//加载驱动,开启连接
//Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/big_data", "root", "123456");
String sql = "select id,name,age from t_student";
ps = conn.prepareStatement(sql);
}

private boolean flag = true;

@Override
public void run(SourceContext<Student> ctx) throws Exception {
while (flag) {
ResultSet rs = ps.executeQuery();
while (rs.next()) {
int id = rs.getInt("id");
String name = rs.getString("name");
int age = rs.getInt("age");
ctx.collect(new Student(id, name, age));
}
TimeUnit.SECONDS.sleep(5);
}
}

@Override
public void cancel() {
flag = false;
}

@Override
public void close() throws Exception {
if (conn != null) conn.close();
if (ps != null) ps.close();
}
}
}

运行结果:

Flink教程(06)- Flink批流一体API(Source示例)_Flink_06

03 文末

本文主要讲解Flink批流一体API中的Source用法,谢谢大家的阅读,本文完!



举报

相关推荐

0 条评论