0
点赞
收藏
分享

微信扫一扫

49、Flink的Java Lambda 表达式写法示例



文章目录

  • Flink 系列文章
  • 一、Java Lambda 表达式
  • 1、maven依赖
  • 2、示例和限制
  • 3、使用非lambda与lambda的比较实现


本文介绍了在flink中使用java lambda表达式写法示例,并且给出了使用与不使用lambda表达式的对比。
本文由于是在IDE中做的例子,不依赖外部环境。
本文分为3个部分,即示例代码的依赖、简单示例与限制和是否使用lambda对比。
本文的示例是在Flink 1.17版本中运行。

一、Java Lambda 表达式

Java 8 引入了几种新的语言特性,旨在实现更快、更清晰的编码。作为最重要的特性,即所谓的“Lambda 表达式”,它开启了函数式编程的大门。Lambda 表达式允许以简捷的方式实现和传递函数,而无需声明额外的(匿名)类。

Flink 支持对 Java API 的所有算子使用 Lambda 表达式,但是,当 Lambda 表达式使用 Java 泛型时,需要 显式 地声明类型信息。

本文档介绍如何使用 Lambda 表达式并描述了其(Lambda 表达式)当前的限制。

有关 Flink API 的通用介绍,请参阅 DataStream API 编程指南。链接如下:
48、Flink DataStream API 编程指南(1)- DataStream 入门示例

48、Flink DataStream API 编程指南(2)- DataStream的source、transformation、sink、调试

48、Flink DataStream API 编程指南(3)- 完整版

1、maven依赖

<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.8</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>

2、示例和限制

下面的这个示例演示了如何实现一个简单的内联 map() 函数,它使用 Lambda 表达式计算输入值的平方。

不需要声明 map() 函数的输入 i 和输出参数的数据类型,因为 Java 编译器会对它们做出推断。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestLambdaDemo {

	public static void test1() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// source
		// transformation
		// sink
		env.fromElements(1, 2, 3)
				// 返回 i 的平方
				.map(i -> i * i).print("平方结果:");
		// execute
		env.execute();
	}

	public static void main(String[] args) throws Exception {
		test1();
	}

}

//运行结果
平方结果::5> 9
平方结果::4> 4
平方结果::3> 1

由于 OUT 是 Integer 而不是泛型,所以 Flink 可以从方法签名 OUT map(IN value) 的实现中自动提取出结果的类型信息。

像 flatMap() 这样的函数,它的签名 void flatMap(IN value, Collector out) 被 Java 编译器编译为 void flatMap(IN value, Collector out)。这样 Flink 就无法自动推断输出的类型信息了。

Flink 很可能抛出如下异常:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
    In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
    An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
    Otherwise the type has to be specified explicitly using type information.

在这种情况下,需要 显式 指定类型信息,否则输出将被视为 Object 类型,这会导致低效的序列化。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *
 */
public class TestLambdaDemo {

	public static void test2() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// source
		DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
		// transformation

		// 必须声明 collector 类型
		input.flatMap((Integer number, Collector<String> out) -> {
			StringBuilder builder = new StringBuilder();
			for (int i = 0; i < number; i++) {
				builder.append("a");
				out.collect(builder.toString());
			}
		})
				// sink
				// 显式提供类型信息
				.returns(Types.STRING)
				.print();
				
		// execute
		env.execute();
	}

	public static void main(String[] args) throws Exception {
		test2();
	}

}

//运行结果
3> a
4> a
5> a
4> aa
5> aa
5> aaa

当使用 map() 函数返回泛型类型的时候也会发生类似的问题。下面示例中的方法签名 Tuple2<Integer, Integer> map(Integer value) 被擦除为 Tuple2 map(Integer value)。

public static void test3() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.fromElements(1, 2, 3).map(i -> Tuple2.of(i, i)) // 没有关于 Tuple2 字段的信息
				.print();
		// execute
		env.execute();
	}

该方法出现的异常如下:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'test3(TestLambdaDemo.java:59)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:543)
	at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1237)
	at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:937)
	at org.tablesql.filesystem.TestLambdaDemo.test3(TestLambdaDemo.java:60)
	at org.tablesql.filesystem.TestLambdaDemo.main(TestLambdaDemo.java:66)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:568)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:154)
	at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:575)
	at org.tablesql.filesystem.TestLambdaDemo.test3(TestLambdaDemo.java:59)
	... 1 more

一般来说,这些问题可以通过多种方式解决,比如下面四种方式:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestLambdaDemo2 {

	public static void test1() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 使用显式的 ".returns(...)"
		env.fromElements(1, 2, 3).map(i -> Tuple2.of(i, i)).returns(Types.TUPLE(Types.INT, Types.INT)).print();

		// execute
		env.execute();
	}

	public static void test2() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 使用类来替代
		env.fromElements(1, 2, 3).map(new AlanMapper()).print();
		// execute
		env.execute();
	}

	public static class AlanMapper implements MapFunction<Integer, Tuple2<Integer, Integer>> {
		@Override
		public Tuple2<Integer, Integer> map(Integer i) {
			return Tuple2.of(i, i);
		}
	}

	public static void test3() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 使用匿名类来替代
		env.fromElements(1, 2, 3).map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {

			@Override
			public Tuple2<Integer, Integer> map(Integer value) throws Exception {
				return Tuple2.of(value, value);
			}
		}).print();

		// execute
		env.execute();
	}

	public static void test4() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// 也可以像这个示例中使用 Tuple 的子类来替代
		env.fromElements(1, 2, 3).map(i -> new AlanDoubleTuple(i, i)).print();
		// execute
		env.execute();
	}

	public static class AlanDoubleTuple extends Tuple2<Integer, Integer> {
		// 默认的构造函数是必须的
		public AlanDoubleTuple() {
		}

		public AlanDoubleTuple(int f0, int f1) {
			this.f0 = f0;
			this.f1 = f1;
		}
	}

	public static void main(String[] args) throws Exception {
		test1();
		test2();
		test3();
		test4();
	}

}

3、使用非lambda与lambda的比较实现

下面的示例主要是对比通过lambda与非lambda的实现方式差异,功能一样,至于个人爱好随意。

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
	private int id;
	private String name;
	private String pwd;
	private String email;
	private int age;
	private double balance;
}


import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TransformationMapDemo {

	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// source

		// transformation
		mapPartitionFunction8(env);
		// sink
		// execute
		env.execute();

	}

	// 构造一个list,然后将list中数字乘以2输出,内部匿名类实现
	public static void mapFunction1(StreamExecutionEnvironment env) throws Exception {

		List<Integer> data = new ArrayList<Integer>();
		for (int i = 1; i <= 10; i++) {
			data.add(i);
		}
		DataStreamSource<Integer> source = env.fromCollection(data);

		SingleOutputStreamOperator<Integer> sink = source.map(new MapFunction<Integer, Integer>() {

			@Override
			public Integer map(Integer inValue) throws Exception {
				return inValue * 2;
			}
		});

//		source.print();
		sink.print();
	}

	// 构造一个list,然后将list中数字乘以2输出,lambda实现
	public static void mapFunction2(StreamExecutionEnvironment env) throws Exception {
		List<Integer> data = new ArrayList<Integer>();
		for (int i = 1; i <= 10; i++) {
			data.add(i);
		}
		DataStreamSource<Integer> source = env.fromCollection(data);
		SingleOutputStreamOperator<Integer> sink = source.map(i -> 2 * i);
		sink.print();
	}

	// 构造User数据源
	public static DataStreamSource<User> source(StreamExecutionEnvironment env) {
		DataStreamSource<User> source = env.fromCollection(
				Arrays.asList(
						new User(1, "alan1", "1", "1@1.com", 12, 1000), 
						new User(2, "alan2", "2", "2@2.com", 19, 200),
						new User(3, "alan1", "3", "3@3.com", 28, 1500), 
						new User(5, "alan1", "5", "5@5.com", 15, 500), 
						new User(4, "alan2", "4", "4@4.com", 30, 400))
				);
		return source;
	}

	// lambda实现用户对象的balance×2和age+5功能
	public static SingleOutputStreamOperator<User> mapFunction3(StreamExecutionEnvironment env) throws Exception {
		DataStreamSource<User> source = source(env);

		SingleOutputStreamOperator<User> sink = source.map((MapFunction<User, User>) user -> {
			User user2 = user;
			user2.setAge(user.getAge() + 5);
			user2.setBalance(user.getBalance() * 2);

			return user2;
		});
		sink.print();
		return sink;
	}

	// lambda实现balance*2和age+5后,balance》=2000和age》=20的数据过滤出来
	public static SingleOutputStreamOperator<User> mapFunction4(StreamExecutionEnvironment env) throws Exception {

		SingleOutputStreamOperator<User> sink = mapFunction3(env).filter(user -> user.getBalance() >= 2000 && user.getAge() >= 20);
		sink.print();
		return sink;
	}

	// lambda实现balance*2和age+5后,balance》=2000和age》=20的数据过滤出来并通过flatmap收集
	public static SingleOutputStreamOperator<User> mapFunction5(StreamExecutionEnvironment env) throws Exception {

		SingleOutputStreamOperator<User> sink = mapFunction4(env).flatMap((FlatMapFunction<User, User>) (user, out) -> {
			if (user.getBalance() >= 3000) {
				out.collect(user);
			}
		}).returns(User.class);

		sink.print();
		return sink;
	}

	// 数据分区示例
	public static void mapPartitionFunction6(StreamExecutionEnvironment env) throws Exception {
		DataStreamSource<User> source = source(env);

		DataStream<User> userTemp = source.map(user -> {
			User user2 = user;
			user2.setAge(user.getAge() + 5);
			return user2;
		}).returns(User.class);

//		public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
//			return setConnectionType(new CustomPartitionerWrapper<>(clean(partitioner),
//					clean(keySelector)));
//		}

		DataStream<User> sink = userTemp.partitionCustom(new Partitioner<Integer>() {

			public int partition(Integer key, int numPartitions) {
				System.out.println("分区数:" + numPartitions);
				if (key < 20)
					numPartitions = 0;
				else if (key >= 20 && key < 30)
					numPartitions = 1;
				else if (key >= 0)
					numPartitions = 2;
				System.out.println("分区数2:" + numPartitions);
				return numPartitions;
			}
		}, new KeySelector<User, Integer>() {

			@Override
			public Integer getKey(User value) throws Exception {
				return value.getAge();
			}
		});
		sink.map((MapFunction<User, User>) user -> {
			System.out.println("当前线程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());
			return user;
		}).returns(User.class);
//		System.out.println("并行数:" + sink.getParallelism());
		// 输出结果,3个区,按照年龄分的
//		当前线程ID:138,user:User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=1500.0)
//		当前线程ID:136,user:User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=1000.0)
//		当前线程ID:138,user:User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=400.0)
//		当前线程ID:140,user:User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=200.0)
//		当前线程ID:140,user:User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=500.0)
		sink.print();
	}

	// lambda数据分区示例
	public static void mapPartitionFunction7(StreamExecutionEnvironment env) throws Exception {
		DataStreamSource<User> source = source(env);

		DataStream<User> userTemp = source.map(user -> {
			User user2 = user;
			user2.setAge(user.getAge() + 5);
			return user2;
		}).returns(User.class);

		DataStream<User> sink = userTemp.partitionCustom((key, numPartitions) -> {
			if (key < 20)
				numPartitions = 0;
			else if (key >= 20 && key < 30)
				numPartitions = 1;
			else if (key >= 0)
				numPartitions = 2;
			return numPartitions;
		}, user -> user.getAge());
		sink.print();
	}

	//按照用户id的奇数和偶数进行分区,如果id=1是单独分区
	public static void mapPartitionFunction8(StreamExecutionEnvironment env) throws Exception {
		DataStreamSource<User> source = source(env);
		DataStream<User> sink = source.partitionCustom(new CusPartitioner(), user -> user.getId());
		// 示例分区过程,输出结果如下
//		1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		当前线程ID:90,user:User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//		当前线程ID:89,user:User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)
//		2> User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)
//		当前线程ID:88,user:User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		当前线程ID:89,user:User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)
//		1> User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)
//		3> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//		当前线程ID:88,user:User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)
//		2> User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)
		sink.map((MapFunction<User, User>) user -> {
			System.out.println("当前线程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());
			return user;
		}).returns(User.class);
		sink.print();
	}

	public static class CusPartitioner implements Partitioner<Integer> {

		@Override
		public int partition(Integer key, int numPartitions) {
			if (key == 1)
				numPartitions = 2;
			else if (key % 2 == 0) {
				numPartitions = 0;
			} else {
				numPartitions = 1;
			}
			return numPartitions;
		}

	}
}

以上,本文介绍了在flink中使用java lambda表达式写法示例,并且给出了使用与不使用lambda表达式的对比。


举报

相关推荐

0 条评论