0
点赞
收藏
分享

微信扫一扫

Flink-常用转换算子、自定义函数以及分区策略

5.4 转换算子

5.4.1 map(映射)

  1. 静态内部类实现接口

public class TransfromMapTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素读取数据
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice","./prod?id=100",3000L));
//进行转换计算,提取user字段
SingleOutputStreamOperator<String> result = stream.map(new MyMapper());
result.print();

env.execute();
}
//自定义MapFunction接口
//1.使用自定义静态内部类实现接口
public static class MyMapper implements MapFunction<Event,String>{

@Override
public String map(Event value) throws Exception {
return value.user;
}
}
}

  1. 匿名类实现

//2.使用匿名类实现MapFunction接口
SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() {
@Override
public String map(Event value) throws Exception {
return value.user;
}
});

  1. lambda表达式

一步到位

//3.传入lambda表达式
SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user);

5.4.2 过滤filter

  1. 静态内部类实现接口

public class TransformFilterTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素读取数据
//1.使用自定义静态内部类实现接口
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice","./prod?id=100",3000L));


SingleOutputStreamOperator<Event> result1 = stream.filter(new MyFilter());


});

result1.print();
env.execute();
}

public static class MyFilter implements FilterFunction<Event>{

@Override
public boolean filter(Event value) throws Exception {
return value.user.equals("Mary");
}
}
}

  1. 匿名类实现

SingleOutputStreamOperator<Event> result2 = stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.user.equals("Mary");
}

  1. lambda表达式

SingleOutputStreamOperator<Event> result3 = stream.filter(data -> data.user.equals("Mary"));

5.4.3 flatmap(扁平映射)

一对多

  1. 静态内部类实现接口

public class TransformFlatMapTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素读取数据
//1.使用自定义静态内部类实现接口
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice","./prod?id=100",3000L));

stream.flatMap(new MyFlatMap()).print();
env.execute();

}

public static class MyFlatMap implements FlatMapFunction<Event,String>{

@Override
//返回是用户collector收集器的collect方法收集
//如果不掉输出的,也可以实现filter的功能
public void flatMap(Event value, Collector<String> out) throws Exception {
out.collect(value.user);
out.collect(value.url);
out.collect(value.timestamp.toString());

}
}
}

  1. 匿名类实现

  1. lambda表达式

stream.flatMap((Event value,Collector<String> out) -> {
if(value.user.equals("Mary")){
out.collect(value.url);
}else if(value.user.equals("Bob")){
out.collect(value.user);
out.collect(value.url);
out.collect(value.timestamp.toString());
}
}).returns(new TypeHint<String>() {})//需要泛型返回
.print();
env.execute();

5.4.4 keyby逻辑分区

  1. 分析

Flink-常用转换算子、自定义函数以及分区策略_声明周期

Flink-常用转换算子、自定义函数以及分区策略_大数据_02

严格来说不是算子,没有返回SingleOutputStreamOperator类(继承DataStream),而是仅仅追加了key的逻辑分区,返回的是keyedstream键控流,只有keyedstream类才有sum,max,reduce方法在进行聚合操作

  1. 简单聚合算子使用

public class TransformSingleAggTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice","./prod?id=100",3000L),
new Event("Bob","./prod?id=1",3300L),
new Event("Bob", "./home", 3500L),
new Event("Alice","./prod?id=200",3000L),
new Event("Bob","./prod?id=2",3800L),
new Event("Bob","./prod?id=3",4200L));

//按键分组之后进行聚合,提取当前用户最近一次访问数据
stream.keyBy(new KeySelector<Event, String>() {
@Override
public String getKey(Event value) throws Exception {
return value.user;
}
}).max("timestamp")
.print("max:");

//maxby
stream.keyBy(data->data.user)
.maxBy("timestamp")
.print("maxBy:");

env.execute();
}
}

max和maxby的区别,max仅仅针对我们定义的字段去截取最大值,其他的字段采用第一条,maxby是全部字段跟着最大值字段更改

Flink-常用转换算子、自定义函数以及分区策略_自定义_03

  1. 归约聚合reduce

Flink-常用转换算子、自定义函数以及分区策略_ide_04

Flink-常用转换算子、自定义函数以及分区策略_自定义_05

Flink-常用转换算子、自定义函数以及分区策略_大数据_06

把集合每一个数据拿出来,然后按照一定的规则不停的规约,最终得到一个唯一规约聚合后的结果

  • 访问量最多的用户

public class TransformReduceTest{
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L));

//计算访问量最大的数据,先计算访问量,后求max
//1.统计每个用户的访问频次
SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}).keyBy(data -> data.f0)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
//values是累计的数据,values2是1
public Tuple2<String, Long> reduce(Tuple2<String, Long> values1, Tuple2<String, Long> values2) throws Exception {
return Tuple2.of(values1.f0, values1.f1 + values2.f1);
}
});

//2.选取当前最活跃的用户
//所有数据都分配到相同的一个key,也就会分配到一个分区
SingleOutputStreamOperator<Tuple2<String, Long>> result = clicksByUser.keyBy(data -> "key")
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
return value1.f1 > value2.f1 ? value1 : value2;
}
});
result.print();
env.execute();
}
}

  • 结果

(Mary,1)
(Bob,1)
(Alice,1)
(Bob,2)
(Alice,2)
(Bob,3)
(Bob,4)
(Bob,5)

5.4.5 用户自定义函数

  1. 富函数类

Flink-常用转换算子、自定义函数以及分区策略_大数据_07

RichMapFunction是一个抽象类,继承自AbstractRichFunction抽象富函数类,同时实现了MapFunction接口

并且有抽象的map方法

  1. 代码

public class TransformRichFunctionTest {
public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L));

stream.map(new MyRichMapper()).setParallelism(2).print();
env.execute();

}

//实现一个自定义的富函数类
//RichMapFunction是抽象类
public static class MyRichMapper extends RichMapFunction<Event,Integer>{

//继承AbstractRichFunction抽象富函数类的声明周期方法
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//可以自定义状态,使得可以进行聚合功能的使用
//这边获取子任务的索引号
System.out.println("open声明周期被调用"+getRuntimeContext().getIndexOfThisSubtask()+"号任务启动");
}

//RichMapFunction抽象类的map
@Override
public Integer map(Event value) throws Exception {
return value.url.length();
}

@Override
public void close() throws Exception {
super.close();
System.out.println("close声明周期被调用"+getRuntimeContext().getIndexOfThisSubtask()+"号任务启动");
}
}
}

  1. 结果

open声明周期被调用0号任务启动
open声明周期被调用1号任务启动
close声明周期被调用0号任务启动
close声明周期被调用1号任务启动
6
13
13
11

5.4.6 物理分区

与keyby不同的是,可以制定分区策略

  1. shuffle随机分区
  • 代码

public class TransformPartitionTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L));

stream.shuffle().print().setParallelism(4);
env.execute();
}
}

  • 结果

1> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}
4> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
2> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
3> Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:03.3}
2> Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.5}
4> Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:04.2}
1> Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0}
2> Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:03.8}

Process finished with exit code 0

  1. 轮询分区

//2.轮询分区
stream.rebalance().print().setParallelism(4);
env.execute();

  • 结果

2> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
1> Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:03.3}
3> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
1> Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:04.2}
4> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}
3> Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.5}
2> Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0}
4> Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:03.8}

  1. rescale重缩放分区

跟rebalance区别是按照自己小组分区

目的是为了减少网络传输损耗,减少资源

  • 代码

//3.rescale重缩放分区
//DataSource并行度为1,因此重新创建自定义Source,运行上下文富函数类,可以获取运行时的上下文
env.addSource(new RichParallelSourceFunction<Integer>() {

@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for(int i =1 ;i<=8;i++){
//将奇数偶数发送到0号和1号并行分区
if(i%2== getRuntimeContext().getIndexOfThisSubtask()){
ctx.collect(i);
}
}
}

@Override
public void cancel() {

}
}).setParallelism(2)
.rescale()
.print()
.setParallelism(4);
env.execute();

  • 结果

2> 4
1> 2
3> 1
3> 5
4> 3
1> 6
2> 8
4> 7

Process finished with exit code 0
1,3,5,7对应输出的是3,4的分区
2,4,6,8对应输出的是1,2分区

  • 如果是rebalance的话,就是1,3,5,7是1,2,3,4完全轮询分区

env.addSource(new RichParallelSourceFunction<Integer>() {

@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for(int i =1 ;i<=8;i++){
//将奇数偶数发送到0号和1号并行分区
if(i%2== getRuntimeContext().getIndexOfThisSubtask()){
ctx.collect(i);
}
}
}

@Override
public void cancel() {

}
}).setParallelism(2)
.rebalance()
.print()
.setParallelism(4);
env.execute();
}
}

  • 结果

3> 6
1> 2
2> 4
4> 8
3> 5
4> 7
2> 3
1> 1

  1. 广播分区
  • 概念

一个数据分发到所有并行子任务上

  • 代码

stream.broadcast().print().setParallelism(4);

  • 结果

1> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
4> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
3> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
2> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
3> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
4> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
1> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
2> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}

一条数据四个分区全部并行输出

  1. 全局分区
  • 概念

把所有数据分配到一个子任务上

  • 代码

//5.全局分区
stream.global().print().setParallelism(4);

  • 注意

会造成系统压力

  1. 自定义重分区
  • 代码

//6.自定义重分区
env.fromElements(1,2,3,4,5,6,7,8)
//传入分区器Partitioner和keyselector(类比keyby了)
.partitionCustom(new Partitioner<Integer>() {
@Override
//返回的int0或者1就表示了物理分区的索引号
//指定当前分区策略,只有第一个和第二个分区会输出信息
public int partition(Integer key, int numPartitions) {
return key%2;
}

//Integer输入,Integer输出
}, new KeySelector<Integer, Integer>() {

@Override
public Integer getKey(Integer value) throws Exception {
//按照value数字分组
return value;
}
})
.print().setParallelism(4);


env.execute();

  • 结果

1> 2
1> 4
2> 1
1> 6
2> 3
1> 8
2> 5
2> 7

Process finished with exit code 0
//指定当前分区策略,只有第一个和第二个分区会输出信息

举报

相关推荐

0 条评论