在此对Flink读写Hive表操作进行逐步记录,需要指出的是,其中操作Hive分区表和非分区表的DDL有所不同,以下分别记录。
基础环境
Hive-3.1.3
Flink-1.17.1
基本操作与准备
1、上传依赖jar包到flink/lib目录下
cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
cp mysql-connector-j-8.1.0.jar
2、更换planner依赖(Hive集成的推荐设置)
mv /usr/sft/flink-1.17.1/opt/flink-table-planner_2.12-1.17.1.jar /usr/sft/flink-1.17.1/lib/
mv /usr/sft/flink-1.17.1/lib/flink-table-planner-loader-1.17.1.jar /usr/sft/flink-1.17.1/opt/
3、启动Hive MetaStore
nohup hive --service metastore 2>&1 &
4、启动flink集群和sql-client
yarn-session.sh -d -nm flink-cluster
sql-client.sh embedded -s yarn-session
5、在flink sql-client中创建hive catalog
CREATE CATALOG hive WITH (
'type' = 'hive',
'default-database' = 'sty',
'hive-conf-dir' = '/usr/sft/hive-3.1.3/conf'
);
非分区表读写
1、Hive中建表并插入数据
create table behavior(
username string,
behavior string
);
insert into behavior values('lisi','buy'),('zhangsan','read');
2、使用hive catalog
use catalog hive;
2、flink sql-client中执行数据插入与数据查询(和常规sql一致)
insert into behavior values('sisi','buy'),('tracy','read');
select *from behavior;
分区表读写
这里和非分区表有所不同,主要体现在建表层面,参考博客:https://www.jianshu.com/p/295066a24092
1、hive创建分区表并插入数据
create table userinfo(
name string,
age int
)
partitioned by (dt string)
stored as orc
tblproperties(
'sink.partition-commit.trigger' = 'partition-time',
'sink.partition-commit.policy.kind'='metastore,success-file',
'partition.time-extractor.timestamp-pattern' ='yyyy-MM-dd HH:mm:ss',
'sink.partition-commit.delay' = '10'
);
insert into table userInfo partition(dt='2023-10-26') values('zhangsan',23);
insert into table userInfo partition(dt='2023-10-26') values('lisi',26),('wangwu',27);
注意:若建表时未在tblproperties中配置恰当的sink.partition-commit.policy.kind,flink sql-client插入数据时将遇到如下报错:
Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive`.`sty`.`userInfo` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind
2、flink sql-client插入与查询数据
insert into userinfo partition(dt='2023-10-24') values('tracy',26),('lily',27);
select *from userinfo;