前言
在JDK8之后,经常可以见到lamda表达式的写法,通过各种lamda表达式可以简化代码的编写,让代码看起来更加简洁,比如大家熟知的 filter , map 等方法,前者可以对集合中的数据进行过滤,后者可以很方便的对对象数据进行字段的规整转换等操作
在flink中,也提供了类似的API操作,方便的对输入流数据进行处理,俗称:转换算子,下面对flink中常用的几个转换算子进行举例说明
1、Map操作
比如以读取下面的外部文件为例,在该文本文件中存在下面的内容
很明显,通过程序读取的时候,会一行行对数据进行解析,通过flink提供的map方法,可以将每一行读取的数据传入到map方法体中,然后可以根据实际情况,对当前行的数据做进一步处理
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SoureTestTransform {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//从环境的集合中获取数据
String path = "E:\\code-self\\flink_study\\src\\main\\resources\\sensor.txt";
DataStreamSource<String> dataStreamSource = env.readTextFile(path);
//做数据转换
SingleOutputStreamOperator<Object> outputStream = dataStreamSource.map(new MapFunction<String, Object>() {
@Override
public Object map(String value) throws Exception {
//在此对传入的数据进行进一步的逻辑处理
return value.length();
}
});
outputStream.print("result1_job").setParallelism(1);
System.out.println("================");
env.execute();
}
}
上面的需求是:打印输出每一行文本内容的长度,运行本段程序,观察控制台输出结果,
2、FlatMap操作
顾名思义,FlatMap为数据的扁平化操作,简单理解就是将数据进行打散操作,打散之后可以方便对数据进行汇聚、规约、计算等操作
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class SoureTestTransform {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//从环境的集合中获取数据
String path = "E:\\code-self\\flink_study\\src\\main\\resources\\sensor.txt";
DataStreamSource<String> dataStreamSource = env.readTextFile(path);
//用逗号切分字段
SingleOutputStreamOperator<String> result2 = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> collector) throws Exception {
String[] split = value.split(",");
for (String field : split) {
collector.collect(field);
}
}
});
result2.print("result2").setParallelism(1);
env.execute();
}
}
本例要实现的需求是,将读取的每一行内容,通过逗号切割后,将结果打印输出到控制台,当然,实际需求中,可以将分割后的数据包装到 Java 的bean对象中去
运行上面的程序,观察控制台的输出结果,
3、Filter操作
这个可以类比Java8中的filter,意思差不多,就是从一批数据中进行过滤
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class SoureTestTransform {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//从环境的集合中获取数据
String path = "E:\\code-self\\flink_study\\src\\main\\resources\\sensor.txt";
DataStreamSource<String> dataStreamSource = env.readTextFile(path);
//filter操作
SingleOutputStreamOperator<String> result3 = dataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith("s1");
}
});
result3.print();
env.execute();
}
}
本例要实现的需求是,过滤出以 "s1"开始的数据,运行本段代码,观察控制台输出结果