文章目录
- Flink 系列文章
- 一、maven依赖
- 二、示例:通过API查询表和使用窗口函数的查询
- 1、示例:基本的查询表
- 2、示例:Tumble窗口查询表
- 3、示例:Over窗口查询表
本文通过Table API 进行基本表的查询操作,同时给出了Tumble和Over窗口的查询示例。
本文除了maven依赖外,没有其他依赖。
本文需要有kafka的运行环境。
一、maven依赖
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
二、示例:通过API查询表和使用窗口函数的查询
本示例实现了Tumble和Over窗口查询。
如果使用sql的窗口函数参考:
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
1、示例:基本的查询表
本示例仅仅是基本的查询表用法,包含2种方式,即Table API 与 SQL的方式。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;
import com.google.common.collect.Lists;
/**
* @author alanchan
*
*/
public class TestTableAPIDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// testQueryTableBySQL();
testQueryTableByAPI();
}
static void testQueryTableByAPI() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// SQL 创建输入表
String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" +
" `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" +
" `partition` BIGINT METADATA VIRTUAL,\r\n" +
" `offset` BIGINT METADATA VIRTUAL,\r\n" +
" `user_id` BIGINT,\r\n" +
" `item_id` BIGINT,\r\n" +
" `behavior` STRING\r\n" +
") WITH (\r\n" +
" 'connector' = 'kafka',\r\n" +
" 'topic' = 'user_behavior',\r\n" +
" 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" +
" 'properties.group.id' = 'testGroup',\r\n" +
" 'scan.startup.mode' = 'earliest-offset',\r\n" +
" 'format' = 'csv'\r\n" +
");";
tenv.executeSql(sourceSql);
// 2、将流转成table
String sql = "select * from Alan_KafkaTable ";
Table streamTable = tenv.sqlQuery(sql);
// 3、API查询
Table resultQuery = streamTable
.groupBy($("user_id"), $("behavior"))
.select($("user_id"), $("behavior"), $("behavior").count().as("count(*)"));
// 4、将流转成table
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultQuery, Row.class);
// 5、sink
resultDS.print();
// 6、执行
env.execute();
// kafka中输入测试数据
// 1,1001,login
// 1,2001,p_read
// 程序运行控制台输入如下
// 14> (true,+I[1, p_read, 1])
// 3> (true,+I[1, login, 1])
}
static void testQueryTableBySQL() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// SQL 创建输入表
String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" +
" `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" +
" `partition` BIGINT METADATA VIRTUAL,\r\n" +
" `offset` BIGINT METADATA VIRTUAL,\r\n" +
" `user_id` BIGINT,\r\n" +
" `item_id` BIGINT,\r\n" +
" `behavior` STRING\r\n" +
") WITH (\r\n" +
" 'connector' = 'kafka',\r\n" +
" 'topic' = 'user_behavior',\r\n" +
" 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" +
" 'properties.group.id' = 'testGroup',\r\n" +
" 'scan.startup.mode' = 'earliest-offset',\r\n" +
" 'format' = 'csv'\r\n" +
");";
tenv.executeSql(sourceSql);
// 查询
String sql = "select * from Alan_KafkaTable ";
Table resultQuery = tenv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultQuery, Row.class);
// 6、sink
resultDS.print();
// 7、执行
env.execute();
// kafka中输入测试数据
// 1,1001,login
// 1,2001,p_read
// 程序运行控制台输入如下
// 11> (true,+I[16:32:19.923, 0, 0, 1, 1001, login])
// 11> (true,+I[16:32:32.258, 0, 1, 1, 2001, p_read])
}
}
2、示例:Tumble窗口查询表
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;
import com.google.common.collect.Lists;
/**
* @author alanchan
*
*/
public class TestTableAPIDemo {
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 20, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L));
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
testQueryTableWithWindwosByAPI();
}
static void testQueryTableWithWindwosByAPI() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
)
;
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"),$("rt").rowtime());
// tumble
Table result = usersTable
.filter(
and(
// $("name").equals("alanchan"),
// $("age").between(1, 20),
$("name").isNotNull(),
$("age").isGreaterOrEqual(19)
)
)
.window(Tumble.over(lit(1).hours()).on($("rt")).as("hourlyWindow"))// 定义滚动窗口并给窗口起一个别名
.groupBy($("name"),$("hourlyWindow"))// 窗口必须出现的分组字段中
.select($("name"),$("name").count().as("count(*)"), $("hourlyWindow").start(), $("hourlyWindow").end())
;
result.printSchema();
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private int age;
private Long rowtime;
}
}
3、示例:Over窗口查询表
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;
import com.google.common.collect.Lists;
/**
* @author alanchan
*
*/
public class TestTableAPIDemo {
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 20, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L));
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private int age;
private Long rowtime;
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
testQueryTableWithWindwosByAPI();
}
static void testQueryTableWithWindwosByAPI() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
)
;
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"),$("rt").rowtime());
// over
usersTable
.window(Over.partitionBy($("name")).orderBy($("rt")).preceding(unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE)).as("hourlyWindow"))
.select($("id"), $("rt"), $("id").count().over($("hourlyWindow")).as("count_t"))
.execute()
.print()
;
env.execute();
}
}
以上,本文通过Table API 进行基本表的查询操作,同时给出了Tumble和Over窗口的查询示例。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)