Flink 函数
文章目录
相关博客:
Flink-函数 | 用户自定义函数(UDF)标量函数 | 表函数 | 聚合函数 | 表聚合函数
一、Flink Table API 和 SQL 内置函数
Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。
SQL中支持的很多函数,Table API 和 SQL都已经做了实现
-
比较函数
-
SQL:
value1 = value2
value1 > value2
-
Table API
ANY1 === ANY2
ANY1 > ANY2
-
-
逻辑函数
-
SQL:
boolean1= boolean2
boolean IS FALSE
NOT boolean
boolean IS FALSE
NOT boolean
-
Table API
BOOLEAN1 || BOOLEAN2
BOOLEAN.isFalse
!BOOLEAN
-
-
算数函数
-
SQL:
numeric1 + numeric2
POWER(numeric1, numeric2)
-
Table API
NUMERIC1 + NUMERIC2
NUMERIC1.POWER(NUMERIC2)
-
-
字符串函数
-
SQL:
string1 + string2
UPPER(string)
CHAR_LENGTH(string)
-
Table API
STRING1 + STRING2
STRING.upperCase()
STRING.charLength()
-
-
时间函数
-
SQL:
DATE string
TIMESTAMP string
CURRENT_TIME
interval string range
-
Table API
STRING.toDate
STRING.toTimestamp
currentTime()
NUMERIC.days
-
-
聚合函数
-
SQL:
COUNT()
SUM(expression)
RANK()
ROW_NUMBER()
-
Table API
FIELD.count
FIELD.sum
-
二、用户自定义函数(UDF)
用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力.
一些系统内置函数无法解决的需求,可以用UDF来自定义实现
在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
函数通过调用 registerFunction()
方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,这样Table API 或 SQL 解析器就可以识别并正确地解释它。
sql函数有两大类型:
- scalar Function类似于map,一对一
- Table Function类似与flatMap,一对多
2.1 标量函数(Scalar Functions)
定义标量函数,可以将0、1或多个标量值,映射到新的标量值
为了定义标量函数,必须扩展基类ScalarFunction
,并实现求值(eval)方法。
标量函数的行为由求值方法决定,求值方法必须public公开声明并命名为eval
例如:
package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
/**
* @author Kewei
* @Date 2022/3/9 15:44
*/
public class UDFTest1_Scalar {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
});
Table table = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature");
// 自定义标量函数,实现求id的hash值
HashCode hashCode = new HashCode(23);
// 注册UDF函数
tableEnv.registerFunction("hashcode",hashCode);
// Table API
Table resultTable = table.select("id, ts, hashcode(id)");
// SQL
tableEnv.createTemporaryView("sensor",table);
Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashcode(id) from sensor");
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");
env.execute();
}
public static class HashCode **extends ScalarFunction**{
private int factor = 13;
public HashCode(int factor){
this.factor = factor;
}
public int eval(String id){
return id.hashCode() * 13;
}
}
}
2.2 表函数(Table Function)
用户定义的表函数,也可以将0、1或多个标量值作为输入参数,与标量函数不同,它可以返回任意数量的行作为输出,而不是单个值。
为了定义一个表函数,必须扩展TableFunction并实现求值方法。
同样的,表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval。
例如:
package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
/**
* @author Kewei
* @Date 2022/3/9 16:01
*/
public class UDFTest2_Table {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 自定义表函数,类似于flatMap
Spilt split = new Spilt("_");
// 注册表函数
tableEnv.registerFunction("split",split);
DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
Table table = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature");
tableEnv.createTemporaryView("sensor",table);
Table resultTable = table.joinLateral("split(id) as (word, length)").select("id, ts, word, length");
Table resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length " +
"from sensor, lateral table(split(id)) as splitid(word, length)");
tableEnv.toAppendStream(resultTable, Row.class).print("res");
tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
env.execute();
}
// 将id分割,并分别返回对应的长度
public static class Spilt extends TableFunction<Tuple2<String,Integer>>{
public String sep = ",";
public Spilt(String sep){
this.sep = sep;
}
public void eval(String str){
for (String s : str.split(sep)) {
collect(new Tuple2<>(s,s.length()));
}
}
}
}
2.3 聚合函数(Aggregate Function)
聚合,多对一,类似前面的窗口聚合
用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。
用户定义的聚合函数,是通过继承 AggregateFunction
抽象类实现的。
- AggregationFunction要求必须实现的方法
- createAccumulator
- accumulate
- getValue
- AggregationFunction的工作原理如下
- 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用
createAccumulator()
方法初始化累加器。 - 随后,对每一个输入行调用函数的
accumulate()
方法来更新累加器。 - 处理完所有行后,将调用
getValue()
方法来计算并返回最终结果。
- 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用
例如:
package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
/**
* @author Kewei
* @Date 2022/3/9 16:23
*/
public class UDFTest3_Aggra {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 实例一个求平均的函数
AvgTemp avgTemp = new AvgTemp();
// 注册一个UDAF
tableEnv.registerFunction("avgTemp",avgTemp);
Table table = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");
tableEnv.createTemporaryView("sensor",table);
// 以下两种Table API写法结果相同
Table result = table.groupBy("id").select("id, avgTemp(temp)");
Table result2 = table
.groupBy("id")
.aggregate("avgTemp(temp) as avgtemp")
.select("id, avgtemp");
// sql
Table resultSql = tableEnv.sqlQuery("select id,avgTemp(temp) from sensor group by id");
tableEnv.toRetractStream(result2, Row.class).print("res");
tableEnv.toRetractStream(resultSql,Row.class).print("sql");
env.execute();
}
// 计算平均温度
public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double,Integer>>{
@Override
public Double getValue(Tuple2<Double, Integer> value) {
return value.f0/value.f1;
}
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
public void accumulate(Tuple2<Double,Integer> acc, Double value){
acc.f0 += value;
acc.f1 += 1;
}
}
}
2.4 表聚合函数(Table Function)
用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。
用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的。
-
AggregationFunction 要求必须实现的方法:
createAccumulator()
accumulate()
emitValue()
-
TableAggregateFunction 的工作原理如下:
- 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用
createAccumulator()
方法可以创建空累加器。 - 随后,对每个输入行调用函数的
accumulate()
方法来更新累加器。 - 处理完所有行后,将调用函数的
emitValue()
方法来计算并返回最终结果。
例如:
package com.root.udf; import com.root.SensorReading; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; /** * @author Kewei * @Date 2022/3/9 16:40 */ public class UDFTest4_TableAgg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt"); SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) { @Override public long extractTimestamp(SensorReading value) { return value.getTimestamp() * 1000L; } }); // 实例一个自定义聚合表函数 MyAggTable myAggTable = new MyAggTable(); // 注册自定义聚合表函数 tableEnv.registerFunction("myAgg",myAggTable); Table table = tableEnv.fromDataStream(dataStream, "id, temperature as temp, timestamp.rowtime as ts"); tableEnv.createTemporaryView("sensor",table); // 使用 Table result = table.groupBy("id") .flatAggregate("myAgg(temp) as (temp, rank)") .select("id, temp, rank"); // 表聚合函数 不支持 sql调用 tableEnv.toRetractStream(result, Row.class).print("res"); env.execute(); } // 创建一个类,保存排名第一和第二的温度 public static class AggTabTempAcc{ public Double highestTemp; public Double secondHighestTemp; public AggTabTempAcc(){ highestTemp = Double.MIN_VALUE; secondHighestTemp = Double.MIN_VALUE; } } // 创建一个TableAggregateFunction函数,用于统计出,同一id排名第一和第二的温度 public static class MyAggTable extends TableAggregateFunction<Tuple2<Double,Integer>,AggTabTempAcc>{ // 初始化累加器 @Override public AggTabTempAcc createAccumulator() { return new AggTabTempAcc(); } // 更新累加器 public void accumulate(AggTabTempAcc acc,Double temp){ if (temp > acc.highestTemp){ acc.secondHighestTemp = acc.highestTemp; acc.highestTemp = temp; } else if (temp > acc.secondHighestTemp){ acc.secondHighestTemp = temp; } } // 输出累加器 public void emitValue(AggTabTempAcc acc, Collector<Tuple2<Double, Integer>> col){ col.collect(new Tuple2<>(acc.highestTemp,1)); col.collect(new Tuple2<>(acc.secondHighestTemp,2)); } } }
- 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用