5.4 转换算子
5.4.1 map(映射)
- 静态内部类实现接口
public class TransfromMapTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素读取数据
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice","./prod?id=100",3000L));
//进行转换计算,提取user字段
SingleOutputStreamOperator<String> result = stream.map(new MyMapper());
result.print();
env.execute();
}
//自定义MapFunction接口
//1.使用自定义静态内部类实现接口
public static class MyMapper implements MapFunction<Event,String>{
@Override
public String map(Event value) throws Exception {
return value.user;
}
}
}
- 匿名类实现
//2.使用匿名类实现MapFunction接口
SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() {
@Override
public String map(Event value) throws Exception {
return value.user;
}
});
- lambda表达式
一步到位
//3.传入lambda表达式
SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user);
5.4.2 过滤filter
- 静态内部类实现接口
public class TransformFilterTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素读取数据
//1.使用自定义静态内部类实现接口
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice","./prod?id=100",3000L));
SingleOutputStreamOperator<Event> result1 = stream.filter(new MyFilter());
});
result1.print();
env.execute();
}
public static class MyFilter implements FilterFunction<Event>{
@Override
public boolean filter(Event value) throws Exception {
return value.user.equals("Mary");
}
}
}
- 匿名类实现
SingleOutputStreamOperator<Event> result2 = stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.user.equals("Mary");
}
- lambda表达式
SingleOutputStreamOperator<Event> result3 = stream.filter(data -> data.user.equals("Mary"));
5.4.3 flatmap(扁平映射)
一对多
- 静态内部类实现接口
public class TransformFlatMapTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素读取数据
//1.使用自定义静态内部类实现接口
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice","./prod?id=100",3000L));
stream.flatMap(new MyFlatMap()).print();
env.execute();
}
public static class MyFlatMap implements FlatMapFunction<Event,String>{
@Override
//返回是用户collector收集器的collect方法收集
//如果不掉输出的,也可以实现filter的功能
public void flatMap(Event value, Collector<String> out) throws Exception {
out.collect(value.user);
out.collect(value.url);
out.collect(value.timestamp.toString());
}
}
}
- 匿名类实现
略
- lambda表达式
stream.flatMap((Event value,Collector<String> out) -> {
if(value.user.equals("Mary")){
out.collect(value.url);
}else if(value.user.equals("Bob")){
out.collect(value.user);
out.collect(value.url);
out.collect(value.timestamp.toString());
}
}).returns(new TypeHint<String>() {})//需要泛型返回
.print();
env.execute();
5.4.4 keyby逻辑分区
- 分析
严格来说不是算子,没有返回SingleOutputStreamOperator类(继承DataStream),而是仅仅追加了key的逻辑分区,返回的是keyedstream键控流,只有keyedstream类才有sum,max,reduce方法在进行聚合操作
- 简单聚合算子使用
public class TransformSingleAggTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice","./prod?id=100",3000L),
new Event("Bob","./prod?id=1",3300L),
new Event("Bob", "./home", 3500L),
new Event("Alice","./prod?id=200",3000L),
new Event("Bob","./prod?id=2",3800L),
new Event("Bob","./prod?id=3",4200L));
//按键分组之后进行聚合,提取当前用户最近一次访问数据
stream.keyBy(new KeySelector<Event, String>() {
@Override
public String getKey(Event value) throws Exception {
return value.user;
}
}).max("timestamp")
.print("max:");
//maxby
stream.keyBy(data->data.user)
.maxBy("timestamp")
.print("maxBy:");
env.execute();
}
}
max和maxby的区别,max仅仅针对我们定义的字段去截取最大值,其他的字段采用第一条,maxby是全部字段跟着最大值字段更改
- 归约聚合reduce
把集合每一个数据拿出来,然后按照一定的规则不停的规约,最终得到一个唯一规约聚合后的结果
- 访问量最多的用户
public class TransformReduceTest{
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L));
//计算访问量最大的数据,先计算访问量,后求max
//1.统计每个用户的访问频次
SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}).keyBy(data -> data.f0)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
//values是累计的数据,values2是1
public Tuple2<String, Long> reduce(Tuple2<String, Long> values1, Tuple2<String, Long> values2) throws Exception {
return Tuple2.of(values1.f0, values1.f1 + values2.f1);
}
});
//2.选取当前最活跃的用户
//所有数据都分配到相同的一个key,也就会分配到一个分区
SingleOutputStreamOperator<Tuple2<String, Long>> result = clicksByUser.keyBy(data -> "key")
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
return value1.f1 > value2.f1 ? value1 : value2;
}
});
result.print();
env.execute();
}
}
- 结果
(Mary,1)
(Bob,1)
(Alice,1)
(Bob,2)
(Alice,2)
(Bob,3)
(Bob,4)
(Bob,5)
5.4.5 用户自定义函数
- 富函数类
RichMapFunction是一个抽象类,继承自AbstractRichFunction抽象富函数类,同时实现了MapFunction接口
并且有抽象的map方法
- 代码
public class TransformRichFunctionTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L));
stream.map(new MyRichMapper()).setParallelism(2).print();
env.execute();
}
//实现一个自定义的富函数类
//RichMapFunction是抽象类
public static class MyRichMapper extends RichMapFunction<Event,Integer>{
//继承AbstractRichFunction抽象富函数类的声明周期方法
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//可以自定义状态,使得可以进行聚合功能的使用
//这边获取子任务的索引号
System.out.println("open声明周期被调用"+getRuntimeContext().getIndexOfThisSubtask()+"号任务启动");
}
//RichMapFunction抽象类的map
@Override
public Integer map(Event value) throws Exception {
return value.url.length();
}
@Override
public void close() throws Exception {
super.close();
System.out.println("close声明周期被调用"+getRuntimeContext().getIndexOfThisSubtask()+"号任务启动");
}
}
}
- 结果
open声明周期被调用0号任务启动
open声明周期被调用1号任务启动
close声明周期被调用0号任务启动
close声明周期被调用1号任务启动
6
13
13
11
5.4.6 物理分区
与keyby不同的是,可以制定分区策略
- shuffle随机分区
- 代码
public class TransformPartitionTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L));
stream.shuffle().print().setParallelism(4);
env.execute();
}
}
- 结果
1> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}
4> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
2> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
3> Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:03.3}
2> Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.5}
4> Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:04.2}
1> Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0}
2> Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:03.8}
Process finished with exit code 0
- 轮询分区
//2.轮询分区
stream.rebalance().print().setParallelism(4);
env.execute();
- 结果
2> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
1> Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:03.3}
3> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
1> Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:04.2}
4> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}
3> Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.5}
2> Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0}
4> Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:03.8}
- rescale重缩放分区
跟rebalance区别是按照自己小组分区
目的是为了减少网络传输损耗,减少资源
- 代码
//3.rescale重缩放分区
//DataSource并行度为1,因此重新创建自定义Source,运行上下文富函数类,可以获取运行时的上下文
env.addSource(new RichParallelSourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for(int i =1 ;i<=8;i++){
//将奇数偶数发送到0号和1号并行分区
if(i%2== getRuntimeContext().getIndexOfThisSubtask()){
ctx.collect(i);
}
}
}
@Override
public void cancel() {
}
}).setParallelism(2)
.rescale()
.print()
.setParallelism(4);
env.execute();
- 结果
2> 4
1> 2
3> 1
3> 5
4> 3
1> 6
2> 8
4> 7
Process finished with exit code 0
1,3,5,7对应输出的是3,4的分区
2,4,6,8对应输出的是1,2分区
- 如果是rebalance的话,就是1,3,5,7是1,2,3,4完全轮询分区
env.addSource(new RichParallelSourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for(int i =1 ;i<=8;i++){
//将奇数偶数发送到0号和1号并行分区
if(i%2== getRuntimeContext().getIndexOfThisSubtask()){
ctx.collect(i);
}
}
}
@Override
public void cancel() {
}
}).setParallelism(2)
.rebalance()
.print()
.setParallelism(4);
env.execute();
}
}
- 结果
3> 6
1> 2
2> 4
4> 8
3> 5
4> 7
2> 3
1> 1
- 广播分区
- 概念
一个数据分发到所有并行子任务上
- 代码
stream.broadcast().print().setParallelism(4);
- 结果
1> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
4> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
3> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
2> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
3> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
4> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
1> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
2> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
一条数据四个分区全部并行输出
- 全局分区
- 概念
把所有数据分配到一个子任务上
- 代码
//5.全局分区
stream.global().print().setParallelism(4);
- 注意
会造成系统压力
- 自定义重分区
- 代码
//6.自定义重分区
env.fromElements(1,2,3,4,5,6,7,8)
//传入分区器Partitioner和keyselector(类比keyby了)
.partitionCustom(new Partitioner<Integer>() {
@Override
//返回的int0或者1就表示了物理分区的索引号
//指定当前分区策略,只有第一个和第二个分区会输出信息
public int partition(Integer key, int numPartitions) {
return key%2;
}
//Integer输入,Integer输出
}, new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
//按照value数字分组
return value;
}
})
.print().setParallelism(4);
env.execute();
- 结果
1> 2
1> 4
2> 1
1> 6
2> 3
1> 8
2> 5
2> 7
Process finished with exit code 0
//指定当前分区策略,只有第一个和第二个分区会输出信息