Flink SQL 自定义函数 - 字符串拆分
1. 添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.13.6</version>
</dependency>
注意:上述依赖并不完整,若要本地测试,还要添加支持本地执行Table Api的依赖
2. 自定义udf函数
@FunctionHint(output = @DataTypeHint("ROW<`str_value` STRING>"))
public class SplitFunction extends TableFunction<Row> {
// 实现eval方法,用于拆分输入字符串并输出每个子串
public void eval(String str, String regex) {
if (str != null) {
// 使用指定正则表达式对输入字符串进行拆分
for (String s : str.split(regex)) {
// 使用collect(...)方法发射一行数据
collect(Row.of(s));
}
}
}
}
3.main方法测试
public static void main(String[] args) throws Exception {
// 设置流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建流式表环境
EnvironmentSettings environmentSettings = EnvironmentSettings
.newInstance()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
// 创建数据流并转换为表
DataStreamSource<String> dataStream = env.fromElements("hello,world");
Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();// 打印表结构
// 创建临时视图
tableEnv.createTemporaryView("MyTable", table);
// 注册自定义函数SplitFunction
tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
// 执行SQL查询,调用SplitFunction拆分字符串
Table result = tableEnv.sqlQuery(
"SELECT f0, str_value " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(f0, ',')) ON TRUE");
// 将结果转换为数据流并打印
tableEnv.toDataStream(result, Row.class).print();
// 执行Flink作业
env.execute("Flink sql SplitFunction Test");
}
4. 执行结果
(
`f0` STRING
)
+I[hello,world, hello]
+I[hello,world, world]
Process finished with exit code 0