0
点赞
收藏
分享

微信扫一扫

翻译 | Flink读写Hive

书呆鱼 2022-04-07 阅读 101
flinkhive

Hive Read & Write #

使用“HiveCatalog”,Apache Flink可以用于统一处理Apache Hive表的“BATCH”和“STREAM”。这意味着Flink可以作为Hive的批处理引擎的高性能替代品,也可以用来不断地读写Hive表中的数据,以支持实时数据仓库应用程序。

Reading #

Flink支持“BATCH”和“STREAMING”模式从Hive读取数据。当作为“BATCH”应用程序运行时,Flink将在执行查询时,在表的状态上执行查询。“STREAMING”读取将持续地监视表,并在新数据可用时增量地获取新数据。缺省情况下,Flink将读取有边界的表。

流式读取同时支持使用分区表和非分区表。对于分区表,Flink将监视新分区的生成,并在可用时增量地读取它们。对于非分区表,Flink将监视文件夹中新文件的生成,并增量地读取新文件。

KeyDefaultTypeDescription
streaming-source.enablefalseBoolean是否启用流源。注意:请确保每个分区/文件应该被原子写入,否则读取器可能会得到不完整的数据。
streaming-source.partition.includeallString设置分区为读取的选项,支持的选项是“all”和“latest”,’ all '表示读取所有分区; ’ latest ‘表示按照’stream -source.partition.order’的顺序读取最新的分区。’ latest ‘仅在流Hive源表用作时态表时有效。默认选项为’ all '. Flink通过启用“stream -source”来支持临时连接最新的hive分区。启用和设置stream -source.partition。包括““最新”,同时,用户可以通过配置以下分区相关选项来分配分区比较顺序和数据更新间隔。
streaming-source.monitor-intervalNoneDuration连续监控分区/文件的时间间隔。注: hive流媒体读取的默认时间间隔为“1分钟”。hive流临时连接的默认时间间隔为’60分钟’, 这是因为有一个框架限制,在当前Hive流时间连接实现中,每个TM都会访问Hive metaStore,这可能会对metaStore产生压力,这将在未来得到改善。
streaming-source.partition-orderpartition-nameString流源的分区顺序, 支持创建时间, partition-time 和 partition-name. Create-time比较分区/文件的创建时间, 这不是Hive metaStore的分区创建时间, 但是文件系统中的文件夹/文件修改时间, 如果分区文件夹以某种方式更新,例如添加新文件到文件夹,它会影响数据的使用方式.分区时间比较从分区名提取的时间. 分区名比较分区名的字母顺序. 对于非分区表,这个值应该总是’create-time’. 缺省值为partition-name. 该选项与已弃用的选项’stream -source. consumer -order’相等。
streaming-source.consume-start-offsetNoneString开始使用流的偏移量。如何解析和比较偏移量取决于您的顺序。对于创建时间和分区时间,应该是一个时间戳字符串(yyyy-[m]m-[d]d [hh:mm:ss])。对于分区时间,将使用分区时间提取器从分区中提取时间。对于分区名称,是分区名称字符串(例如pt_year=2020/pt_mon=10/pt_day=01)。

SQL Hints可以在不改变Hive metastore中定义的情况下对Hive表进行配置。

SELECT * 
FROM hive_table 
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

Notes

  • 监控策略是扫描当前位置路径下的所有目录/文件。分区过多可能导致性能下降。
  • 对于非分区表的流式读取要求将每个文件以原子方式写入目标目录。
  • 对于分区表的流读取要求每个分区应该在hive metastore视图中被原子地添加。否则,将使用添加到现有分区的新数据。
  • 流读取不支持Flink DDL中的水印语法。这些表不能用于窗口操作符。

Reading Hive Views #

Flink可以从Hive定义的视图中读取数据,但是有一些限制:

  1. Hive目录必须设置为当前目录,才能查询视图。这可以通过在表API中的’ tableEnv.useCatalog(…)‘或’ USE CATALOG…'在SQL客户端。
  2. Hive和Flink SQL有不同的语法,例如不同的保留关键字和字面值。确保视图的查询与Flink语法兼容。

Vectorized Optimization upon Read #

当满足以下条件时,Flink将自动使用向量化读取Hive表:

  • 格式:ORC或Parquet。
  • 没有复杂数据类型的列,如hive类型:List, Map, Struct, Union。

该功能默认启用。以下配置可能会禁用它。

table.exec.hive.fallback-mapred-reader=true

Source Parallelism Inference #

默认情况下,Flink将根据文件的数量和每个文件中的块数量推断出Hive阅读器的最佳并行度。

Flink可以灵活配置并行推理策略。你可以在’ TableConfig '中配置以下参数(注意,这些参数影响作业的所有来源):

KeyDefaultTypeDescription
table.exec.hive.infer-source-parallelismtrueBoolean如果为真,则根据拆分数推断源并行度。如果为false,则由config设置源的并行度。
table.exec.hive.infer-source-parallelism.max1000Integer设置源操作符的最大推断并行度。

Load Partition Splits #

多线程用于分割hive的分区。你可以使用“table. execute .hive.load-partition- partitions”。thread-num '来配置线程号。默认值为3,且配置值应大于0。

Temporal Table Join #

你可以使用一个Hive表作为一个动态表,然后一个流可以通过动态连接来关联这个Hive表。有关临时连接的更多信息,请参阅temporal join。

Flink支持处理时间时态连接Hive Table,处理时间时态连接总是连接最新版本的时态表。Flink支持分区表和Hive非分区表的时态连接,对于分区表,Flink支持自动跟踪Hive表的最新分区。

NOTE: Flink目前还不支持事件时间临时连接Hive表。

Temporal Join The Latest Partition #

对于一个随时间变化的分区表,我们可以将其作为一个无界流读取出来,如果每个分区都包含一个版本的完整数据,那么分区就可以作为时态表的一个版本,时态表的版本保留分区的数据。

Flink支持在处理时间时态连接时自动跟踪时态表的最新分区(版本),最新分区(版本)是由’ stream -source '定义的。这是Flink流应用程序作业中使用Hive表作为维度表的最常见的用户情况。

NOTE: 此功能只支持在Flink ’ STREAMING '模式。

下面演示展示了一个经典商业管道,维度表来自Hive,每天更新一次,一批管道工作或Flink工作,Kafka流来自实时在线业务数据或日志,需要加入丰富的维度表流。

-- Assume the data in hive table is updated per day, every day contains the latest and complete dimension data
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...
) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
  -- 使用默认的分区名顺序,每12小时加载最新的分区(最推荐和最方便的方式)
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '12 h',
  'streaming-source.partition-order' = 'partition-name',  -- option with default value, can be ignored.

  -- 使用分区文件create-time命令每12小时加载最新的分区
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.partition-order' = 'create-time',
  'streaming-source.monitor-interval' = '12 h'

  -- 使用分区时间顺序每12小时加载最新的分区
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '12 h',
  'streaming-source.partition-order' = 'partition-time',
  'partition.time-extractor.kind' = 'default',
  'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' 
);

SET table.sql-dialect=default;
CREATE TABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME()
) WITH (...);


-- streaming sql, kafka temporal join a hive dimension table. Flink will automatically reload data from the
-- configured latest partition in the interval of 'streaming-source.monitor-interval'.

SELECT * FROM orders_table AS o 
JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
ON o.product_id = dim.product_id;

Temporal Join The Latest Table #

对于一个Hive表,我们可以将它作为一个有界流读取出来。在这种情况下,Hive表只能在我们查询的时候跟踪它的最新版本。最新版本的table将保留所有Hive表的数据。

当执行临时连接时,最新的Hive表, Hive表将被缓存在Slot内存中,并且每条流记录都通过key与表连接,以决定是否找到匹配.使用最新的Hive表作为时态表不需要任何额外的配置. 可选择地, 可以用下面的属性来配置Hive表缓存的TTL.缓存过期后,Hive表会再次扫描,加载最新的数据.

KeyDefaultTypeDescription
lookup.join.cache.ttl60 minDuration在查找连接中构建表的缓存TTL(例如10分钟). 默认的TTL是60分钟. 注意: 该选项仅在查找有界的hive表源时有效, 如果你使用流式hive源作为临时表, 请使用“streaming-source.Monitor-interval '配置数据更新的时间间隔。

下面的演示演示了如何将hive表的所有数据加载为一个临时表。

-- 假设hive表中的数据被批处理管道覆盖.
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...
) TBLPROPERTIES (
  'streaming-source.enable' = 'false',           -- 选项的默认值,可以忽略.
  'streaming-source.partition.include' = 'all',  -- 选项的默认值,可以忽略.
  'lookup.join.cache.ttl' = '12 h'
);

SET table.sql-dialect=default;
CREATE TABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME()
) WITH (...);


-- 流式sql, kafka加入一个hive维度表。Flink将在缓存ttl过期后重新加载dimension_table中的所有数据。

SELECT * FROM orders_table AS o 
JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
ON o.product_id = dim.product_id;

Note:

  1. 每个连接的子任务需要保持自己的Hive表缓存。请确保Hive表能够装入TM任务槽的内存中。
  2. 它被鼓励为“流-源”设置一个相对较大的值。Monitor-interval ‘(作为时态表的最新分区)或’ lookup.join.cache. '。Ttl '(所有的分区作为时态表)。否则,由于表需要频繁更新和重新加载,Jobs容易出现性能问题。
  3. 目前,只要缓存需要刷新,我们就会加载整个Hive表。没有办法区分新数据和旧数据。

Writing #

Flink支持在“批处理”和“流”模式下从Hive写入数据。当作为一个“BATCH”应用程序运行时,Flink只会在Job完成时将这些记录写入Hive表。“BATCH”写入支持对现有表的追加和覆盖。

# ------ INSERT INTO将追加到表或分区,保持现有数据不变 ------ 
Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;

# ------ INSERT OVERWRITE将覆盖表或分区中的任何现有数据 ------ 
Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;

数据也可以插入到特定的分区中

# ------ 插入静态分区 ------ 
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;

# ------ 使用动态分区插入 ------ 
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';

# ------ 插入静态(my_type)和动态(my_date)分区 ------ 
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';

流式不断向Hive写入新数据, 增量地提交记录——使其可见. 用户可以通过几个属性来控制何时/如何触发提交. 流写入不支持插入覆盖。

下面的例子展示了如何使用流接收器来写一个流查询,通过分区提交将数据从Kafka写入到Hive表中, 并运行一个批量查询来读取该数据.

有关可用配置的完整列表,请参阅流接收器。

SET table.sql-dialect=hive;
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

SET table.sql-dialect=default;
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
) WITH (...);

-- 流式sql,插入到hive表
INSERT INTO TABLE hive_table 
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;

-- 批处理sql,选择分区修剪
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

如果水印定义在TIMESTAMP_LTZ列上,并使用“分区时间”提交,则“sink.partition-commit”。需要将Watermark-time-zone '设置为会话时区,否则提交的分区可能会在几个小时后发生。

SET table.sql-dialect=hive;
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假设用户配置的时区为“亚洲/上海”
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

SET table.sql-dialect=default;
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  ts BIGINT, -- 时间(epoch毫秒)
  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 定义TIMESTAMP_LTZ列上的水印
) WITH (...);

-- 流式sql,插入到hive表
INSERT INTO TABLE hive_table 
SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH')
FROM kafka_table;

-- 批处理sql,选择分区修剪
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

默认情况下,对于流写入,Flink只支持重命名提交器,这意味着S3文件系统不能支持一次流写入。通过将以下参数配置为false,可以实现对S3的一次精确写入。这将指示接收器使用Flink的本地写入器,但只适用于拼花和orc文件类型。此配置在’ tablecconfig '中设置,并将影响作业的所有接收器。

KeyDefaultTypeDescription
table.exec.hive.fallback-mapred-writertrueBoolean如果为false,使用flink native writer来编写拼花和orc文件;如果是真的,使用hadoop mapred record writer来写parquet和orc文件。

Formats #

Flink的Hive集成已经针对以下文件格式进行了测试:

  • Text
  • CSV
  • SequenceFile
  • ORC
  • Parquet

来源:flink官方文档1.12

举报

相关推荐

0 条评论