1.1 JDBC SQL Connector
JDBC 连接器可以让 Flink 程序从拥有 JDBC 驱动的任意关系型数据库中读取数据或将数据写入数据库。
如果在 Flink SQL 表的 DDL 语句中定义了主键,则会以 upsert 模式将流中数据写入数据库,此时流中可以存在UPDATE/DElETE(更新/删除)类型的数据。否则,会以 append 模式将数据写出到数据库,此时流中只能有 INSERT(插入)类型的数据。
DDL创建示例:
CREATE TABLE MyUserTable (id BIGINT,name STRING,age INT,status BOOLEAN,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'users');
JDBC Connecter 参数
- connector:连接器类型,此处为 jdbc
- url:数据库 url
- table-name:数据库中表名
- lookup.cache.max-rows:lookup 缓存中的最大记录条数
- lookup.cache.ttl:lookup 缓存中每条记录的最大存活时间
- username:访问数据库的用户名
- password:访问数据库的密码
- driver:数据库驱动,注意:通常注册驱动可以省略,但是自动获取的驱动是 com.mysql.jdbc.Driver,Flink CDC 2.1.0 要求 mysql 驱动版本必须为 8 及以上,在 mysql-connector -8.x 中该驱动已过时,新的驱动为 com.mysql.cj.jdbc.Driver。省略该参数控制台打印的警告如下
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'.
The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
1.2 Lookup Cache
JDBC connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.
By default, lookup cache is not enabled. You can enable it by setting both lookup.cache.max-rows
and lookup.cache.ttl
.
The lookup cache is used to improve performance of temporal join the JDBC connector. By default, lookup cache is not enabled, so all the requests are sent to external database. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests to external database when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows lookup.cache.max-rows
or when the row exceeds the max time to live lookup.cache.ttl
. The cached rows might not be the latest, users can tune lookup.cache.ttl
to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.
JDBC 连接器可以作为时态表关联中的查询数据源(又称维表)。目前,仅支持同步查询模式。
默认情况下,查询缓存(Lookup Cache)未被启用,需要设置lookup.cache.max-rows
和 lookup.cache.ttl
参数来启用此功能。
Lookup 缓存是用来提升有 JDBC 连接器参与的时态关联性能的。默认情况下,缓存未启用,所有的请求会被发送到外部数据库。当缓存启用时,每个进程(即 TaskManager)维护一份缓存。
收到请求时,Flink 会先查询缓存,如果缓存未命中才会向外部数据库发送请求,并用查询结果更新缓存。如果缓存中的记录条数达到了 lookup.cache.max-rows 规定的最大行数时将清除存活时间最久的记录。如果缓存中的记录存活时间超过了 lookup.cache.ttl 规定的最大存活时间,同样会被清除。
缓存中的记录未必是最新的,可以将 lookup.cache.ttl 设置为一个更小的值来获得时效性更好的数据,但这样做会增加发送到数据库的请求数量。所以需要在吞吐量和正确性之间寻求平衡。
1.3 Lookup Join
Lookup Join 通常在 Flink SQL 表和外部系统查询结果关联时使用。这种关联要求一张表(主表)有处理时间字段,而另一张表(维表)由 Lookup 连接器生成。
Lookup Join 做的是维度关联,而维度数据是有时效性的,那么我们就需要一个时间字段来对数据的版本进行标识。因此,Flink 要求我们提供处理时间用作版本字段。此处选择调用 PROCTIME() 函数获取系统时间,将其作为处理时间字段。该函数调用
(1)不设置ttl
第一次:
[root@bigdata ~]# nc -lk 9999
1001,zs
+----+-------------+---------+-------------+-------------+
| op | id | name | subject | score |
+----+-------------+---------+-------------+-------------+
| +I | 1001 | zs | 美丽动人 | 111 |
第二次:修改score数据
[root@bigdata ~]# nc -lk 9999
1001,zs
1001,zs
+----+-------------+---------+-------------+-------------+
| op | id | name | subject | score |
+----+-------------+---------+-------------+-------------+
| +I | 1001 | zs | 美丽动人 | 111 |
| +I | 1001 | zs | 美丽动人 | 100 |
(2)设置ttl
第一次:
[root@bigdata ~]# nc -lk 9999
1001,zs
+----+-------------+---------+-------------+-------------+
| op | id | name | subject | score |
+----+-------------+---------+-------------+-------------+
| +I | 1001 | zs | 美丽动人 | 111 |
第二次:修改score数据
[root@bigdata ~]# nc -lk 9999
1001,zs
1001,zs
+----+-------------+---------+-------------+-------------+
| op | id | name | subject | score |
+----+-------------+---------+-------------+-------------+
| +I | 1001 | zs | 美丽动人 | 111 |
| +I | 1001 | zs | 美丽动人 | 111 |
1.4 完整代码
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class LookUpJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
//读取端口数据创建流并转换为动态表
DataStreamSource<String> socketTextStream = env.socketTextStream("bigdata", 9999);
SingleOutputStreamOperator<Student> map = socketTextStream.map(line -> {
String[] split = line.split(",");
return new Student(Integer.parseInt( split[0]), split[1]);
});
Table table = tableEnv.fromDataStream(map, $("id"), $("name"), $("pt").proctime()
);
tableEnv.createTemporaryView("student", table);
//tableEnv.executeSql("select * from student").print();
tableEnv.executeSql("" +
"CREATE TABLE student_desc ( " +
" id INT, " +
" subject STRING, " +
" score INT " +
") WITH ( " +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://bigdata100:3306/test', " +
" 'username' = 'root', " +
" 'password' = '123456', " +
" 'lookup.cache.max-rows' = '10', " +
" 'lookup.cache.ttl' = '100000 ms', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'table-name' = 'student_desc' " +
")");
tableEnv.executeSql("" +
"select " +
" t1.id, " +
" t1.name, " +
" t2.subject, " +
" t2.score " +
"from student t1 " +
"join student_desc FOR SYSTEM_TIME AS OF t1.pt as t2 " +
"on t1.id = t2.id")
.print();
}
}