0
点赞
收藏
分享

微信扫一扫

【Flink-API】之复习一系列Transformation/Sink操作

WikongGuan 2022-10-28 阅读 155


一、Map

1.1 介绍

1.DataStream->DataStream 数据集转换。
2.数据集合中的元素一一映射的关系。

1.2 MapFunction

public class Map01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// source
DataStream<Integer> nums = env.fromElements(1, 2, 3, 4, 5);

// 实现一:map 方法做映射,输入输出一直的2倍
SingleOutputStreamOperator<Integer> result1 = nums.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value * 2;
}
});

// 实现二:lambada 表达式的应用
SingleOutputStreamOperator<Integer> result2 = nums.map(i -> i * 2);

// sink
result1.print();
result2.print();
env.execute();
}
}

结果如下:

【Flink-API】之复习一系列Transformation/Sink操作_实体类

1.3 RichMapFunction

1.open()方法 在构造方法之后,map方法之前,只执行一次,初始化方法 使用数据库
2.Configuration为全局的配置
3.close()方法 销毁之前执行一次,通常为资源额的释放
程序如下:

// RichMapFunction
nums.map(new RichMapFunction<Integer, Integer>() {

// open 在构造方法之后,map方法之前,只执行一次,初始化方法 使用数据库
// Configuration为全局的配置
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}

@Override
public Integer map(Integer value) throws Exception {
return value * 10;
}

// close 销毁之前执行一次,通常为资源额的释放
@Override
public void close() throws Exception {
super.close();
}
});

二、FlatMap

2.1 介绍

输入一个元素,会被切分成多个元素。一对多。

2.2 FlatMapFunction

public class FlatMap01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.fromElements("GW001001 GW002002 GW003003", "GW001001 GW002002 GW003003", "GW001001 GW002002 GW003003");

SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {

//实现一:jdk8的流式处理 lambada表达式【推荐】
//Arrays.stream(line.split(" ")).forEach(out::collect);

//实现二:
//Arrays.asList(line.split(" ")).forEach(w -> out.collect(w));

//实现三:最原始的方式
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
});

words.print();
env.execute();
}
}

结果如下:

【Flink-API】之复习一系列Transformation/Sink操作_ide_02

2.3 RichFlatMapFunction

雷同方法 open() close()方法。

三、Filter

3.1 介绍

1.ture是留下,false过滤
2.实现对输入的数据进行逻辑判断,判断是否是奇数?

3.2 FilterFunction

public class Filter01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> nums = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);

//实现一:ture是留下,false时过滤
SingleOutputStreamOperator<Integer> filter1 = nums.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 != 0;
}
});

//实现二:lambada表达式:filter2
//SingleOutputStreamOperator<Integer> filter2 = nums.filter(i -> i >= 5);
SingleOutputStreamOperator<Integer> filter2 = nums.filter(i -> {
//换行要有return
return i >= 5;
});

filter1.print();
filter2.print();

env.execute();
}
}

计算结果:

【Flink-API】之复习一系列Transformation/Sink操作_ide_03

四、KeyBy

实时计算的算子

4.1 lambda实现

1.实现输入一个,返回一个,使用lambada表达式 代替new Function,利用虚拟机开设一个socket端口号,实现实时聚合计算。
2.虚拟机centos中:nc -lk 8888
3.元组也是一个特殊的集合,角标 0 开始 最大Tuple25
4.代码实现:

public class KeyBy01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);

//使用lambada表达式 代替new Function
//输入一个返回一个
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(w -> Tuple2.of(w, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));

//元组也是一个特殊的集合,角标 0 开始 最大Tuple25
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

//聚合
keyed.print();

env.execute("KeyBy01");
}
}

运行结果:

【Flink-API】之复习一系列Transformation/Sink操作_字段_04

4.2 KeyBy自定义实体类

1.实体类WordAndCount

public class WordAndCount {
private String word;
private Long counts;
}

2.keyby sum

public class KeyBy02 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8808);

//输入一个返回一个
SingleOutputStreamOperator<WordAndCount> wordAndOne = lines.map(new MapFunction<String, WordAndCount>() {
@Override
public WordAndCount map(String value) throws Exception {
return Turbine.of(value,1L);
}
});

//根据实体类的字段进行聚合
KeyedStream<WordAndCount, Tuple> keyed = wordAndOne.keyBy("word");

//聚合
SingleOutputStreamOperator<WordAndCount> sumed = keyed.sum("counts");

keyed.print();
sumed.print();


env.execute();
}
}

运行结果:

【Flink-API】之复习一系列Transformation/Sink操作_实体类_05

4.3 keyBy多字段进行分组

代码:

public class KeyBy03 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8828);

//山东,烟台,2000
//山东,烟台,2000
//山东,烟台,2000
SingleOutputStreamOperator<Tuple3<String, String, Double>> provinceCityMoney = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
@Override
public Tuple3<String, String, Double> map(String line) throws Exception {
//切分
String[] fields = line.split(",");
String province = fields[0];
String city = fields[1];
double money = Double.parseDouble(fields[2]);
return Tuple3.of(province, city, money);
}
});

//按照省份,城市分组 多个字段进行分组,最后一个字段进行聚合
/**
* 如果是自己定义的bean实体类,可以进行将字段写进去
*/
SingleOutputStreamOperator<Tuple3<String, String, Double>> summed = provinceCityMoney.keyBy(0, 1).sum(2);
summed.print();
env.execute();
}
}

运行结果:

【Flink-API】之复习一系列Transformation/Sink操作_ide_06

五、Reduce

聚合,sum只可以加法,而reduce可以乘法,可以自定义算子。

public class Reduce01 {
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8828);

//使用lambada表达式 代替new Function
//输入一个返回一个
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(w -> Tuple2.of(w, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));

//元组也是一个特殊的集合,角标 0 开始 最大Tuple25
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> reduced = keyed.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {

String key = value1.f0;
Integer count1 = value1.f1;
Integer count2 = value2.f1;

Integer counts = count1 + count2;

return Tuple2.of(key, counts);
}
});

reduced.print();
env.execute();
}
}

运行结果:

【Flink-API】之复习一系列Transformation/Sink操作_ide_07

六、Max

例如:输入
spark,10
spark,20
hadoop,10
和历史数据比较,求最大次数的,最大的留下,最小的丢弃

public class Max01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);

SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] fields = line.split(",");
String words = fields[0];
int num = Integer.parseInt(fields[1]);
return Tuple2.of(words, num);
}
});

//按照单词进行分组
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

//求最大次数的,最大的留下,最小的丢弃
SingleOutputStreamOperator<Tuple2<String, Integer>> max = keyed.max(1);

max.print();
env.execute();
}
}

运行结果:

【Flink-API】之复习一系列Transformation/Sink操作_ide_08

七、Sink

7.1 print()

打印到控制台也是sink.

7.2 CSV

public class AddSink01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("192.168.52.200", 8888);

SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] fields = line.split(",");
String words = fields[0];
int num = Integer.parseInt(fields[1]);

return Tuple2.of(words, num);
}
});

//按照单词进行分组
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

SingleOutputStreamOperator<Tuple2<String, Integer>> max = keyed.max(1);

/**
* 自定义sink,比如 写入数据库,磁盘等等
* 不需要有返回就可以
*/
max.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {

System.out.println(value);

}
});
/**
* 写入磁盘
* 如果是写入scv文件 必须时tuple格式
*/
max.writeAsCsv("F:\\out222", FileSystem.WriteMode.OVERWRITE);
max.print();
env.execute();

}
}

7.3 RedisSink

public class MyRedisSink extends RichSinkFunction<Turbine> {
//初始化redis连接
private transient Jedis jedis;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String host = params.getRequired("redis.host");
//String password = params.getRequired("redis.pwd");
int db = params.getInt("redis.db", 0);
jedis = new Jedis(host, 6379, 5000);
//jedis.auth(password);
jedis.select(db);
}

@Override
public void invoke(Turbine value, Context context) throws Exception {
if (!jedis.isConnected()) {
jedis.connect();
}
//写入redis
jedis.hset(value.word, value.province, String.valueOf(value.counts));
}

@Override
public void close() throws Exception {
super.close();
jedis.close();
}
}

7.4 MySqlSink

public class MySqlSink extends RichSinkFunction<GW200001> {

//最好连接不参与序列化
private transient Connection conn = null;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//创建mysql连接
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/turbine?characterEncoding=UTF-8", "root", "123456");
System.out.println("拿到连接了");
}

@Override
public void invoke(GW200001 gw200001, Context context) throws Exception {
//更新,插入,统计业务
PreparedStatement pstm = null;
try {
pstm = conn.prepareStatement("insert into gw200001(wt_number, wt_date_time) values(?,?)");
pstm.setString(1, gw200001.wt_number);
pstm.setString(2, gw200001.wt_date_time);

//执行sql executeUpdate() executeQuery()
System.out.println("执行sql");
pstm.execute();
} finally {
if (pstm != null) {
pstm.close();
System.out.println("正常关闭");
}
}
}

@Override
public void close() throws Exception {
super.close();
conn.close();
System.out.println("正常关闭了!");
}
}


举报

相关推荐

0 条评论