文章目录
- 环境准备
- 配置`pom.xml`
- 配置log4j
- 写入数据
- 读取数据
- 常规Source写法
- Batch方式
- Streaming方式
- FLIP-27 Source写法
- Batch方式
- Streaming方式
- 合并小文件
数据湖Iceberg-简介(1)
数据湖Iceberg-存储结构(2)
数据湖Iceberg-Hive集成Iceberg(3)
数据湖Iceberg-SparkSQL集成(4)
数据湖Iceberg-FlinkSQL集成(5)
数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)
数据湖Iceberg-Flink DataFrame集成(7)
环境准备
配置pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.3</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> <!–不会打包到依赖中,只参与编译,不参与运行 –>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!--idea运行时也有webui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.16 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.14</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
配置log4j
resources目录下新建log4j.properties。
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
写入数据
目前支持DataStream和DataStream格式的数据流写入Iceberg表
def writeData() = {
val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
val env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1)
val input: SingleOutputStreamOperator[RowData] = env.fromElements("")
.map(x => {
import org.apache.flink.table.data.GenericRowData
val genericRowData = new GenericRowData(2)
genericRowData.setField(0, 123455L)
genericRowData.setField(1, StringData.fromString("99L"))
genericRowData
})
val loader: TableLoader = TableLoader.fromHadoopTable(path)
FlinkSink.forRowData(input)
.tableLoader(loader)
// .set("write-format", "orc") //设置其他参数
// .set(FlinkWriteOptions.OVERWRITE_MODE, "true") //设置其他参数
.append() // append方式
// .overwrite(true) //overwrite方式
// .upsert(true) // upsert方式
env.execute()
}
写入配置参数选项
选项 | 默认值 | 说明 |
write-format | Parquet 同write.format.default | 写入操作使用的文件格式:Parquet, avro或orc |
target-file-size-bytes | 536870912(512MB) 同write.target-file-size-bytes | 控制生成的文件的大小,目标大约为这么多字节 |
upsert-enabled | 同write.upsert.enabled, | |
overwrite-enabled | false | 覆盖表的数据,不能和UPSERT模式同时开启 |
distribution-mode | None 同 write.distribution-mode | 定义写数据的分布方式: none:不打乱行; hash:按分区键散列分布; range:如果表有SortOrder,则通过分区键或排序键分配 |
compression-codec | 同 write.(fileformat).compression-codec | |
compression-level | 同 write.(fileformat).compression-level | |
compression-strategy | 同write.orc.compression-strategy |
读取数据
常规Source写法
Batch方式
/**
* 读取 Iceberg 表数据的方法
*
* @return 返回读取的数据
* @author Jast
*/
def readDataForBatch() = {
// 定义Iceberg表路径
val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
// 获取Flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从路径中加载Iceberg表
val tableLoader = TableLoader.fromHadoopTable(path);
// 构建一个消费Iceberg表数据的DataStream
val batch: DataStream[RowData] = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
// .startSnapshotId() 可以指定从哪个快照开始
.streaming(false)
.build();
// 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
batch.map(r => (r.getLong(0), r.getString(1)))
.returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
.print("数据")
// 执行Flink任务
env.execute("Test Iceberg Read");
}
Streaming方式
def readDataForStream() = {
// 定义Iceberg表路径
val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
// 获取Flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从路径中加载Iceberg表
val tableLoader = TableLoader.fromHadoopTable(path);
// 构建一个消费Iceberg表数据的DataStream
val batch: DataStream[RowData] = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.build();
// 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
batch.map(r => (r.getLong(0), r.getString(1)))
.returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
.print("数据流")
// 执行Flink任务
env.execute("Test Iceberg Read");
}
FLIP-27 Source写法
Batch方式
def readDataForBatchFLIP27() = {
// 定义Iceberg表路径
val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
// 获取Flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从路径中加载Iceberg表
val tableLoader = TableLoader.fromHadoopTable(path);
val source: IcebergSource[RowData] = IcebergSource.forRowData()
.tableLoader(tableLoader)
.assignerFactory(new SimpleSplitAssignerFactory)
.build()
val batch: DataStream[RowData] = env.fromSource(
source,
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"Iceberg Source",
TypeInformation.of(classOf[RowData])
)
// 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
batch.map(r => (r.getLong(0), r.getString(1)))
.returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
.print("FLIP27数据")
// 执行Flink任务
env.execute("Test Iceberg Read");
}
Streaming方式
def readDataForStreamFLIP27() = {
// 定义Iceberg表路径
val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
// 获取Flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从路径中加载Iceberg表
val tableLoader = TableLoader.fromHadoopTable(path);
val source: IcebergSource[RowData] = IcebergSource.forRowData()
.tableLoader(tableLoader)
.assignerFactory(new SimpleSplitAssignerFactory)
.streaming(true)
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
.build()
val batch: DataStream[RowData] = env.fromSource(
source,
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"Iceberg Source",
TypeInformation.of(classOf[RowData])
)
// 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
batch.map(r => (r.getLong(0), r.getString(1)))
.returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
.print("FLIP27数据流")
// 执行Flink任务
env.execute("Test Iceberg Read");
}
合并小文件
/**
* 合并小文件
*
* @return
*/
def mergeSmallFiles() = {
// 1.获取 Table对象
// 1.1 创建 catalog对象
// val conf: Configuration = new Configuration()
// val hadoopCatalog = new HadoopCatalog(conf, "hdfs://hadoop1:8020/warehouse/spark-iceberg")
// 1.2 通过 catalog加载 Table对象
// val table: Table = hadoopCatalog.loadTable(TableIdentifier.of("default", "a"))
// 有Table对象,就可以获取元数据、进行维护表的操作
// System.out.println(table.history());
// System.out.println(table.expireSnapshots().expireOlderThan());
// 2.通过 Actions 来操作 合并
// Actions.forTable(table)
// .rewriteDataFiles()
// .targetSizeInBytes(1024L)
// .execute();
// 1. 获取 Table 对象
// 1.1 创建 Configuration 对象
val conf: Configuration = new Configuration()
// 1.2 创建 HadoopCatalog 对象,传入 Configuration 对象和表路径字符串
val hadoopCatalog = new HadoopCatalog(conf, "hdfs://hadoop1:8020/warehouse/spark-iceberg")
// 1.3 通过 TableIdentifier.of() 方法创建 TableIdentifier 对象,传入库名和表名
val tableId: TableIdentifier = TableIdentifier.of("default", "a")
// 1.4 通过 HadoopCatalog 对象的 loadTable() 方法加载 Table 对象,传入 TableIdentifier 对象
val table: Table = hadoopCatalog.loadTable(tableId)
// 2. 通过 Actions 来操作合并
// 2.1 调用 Actions.forTable() 方法,传入 Table 对象,实例化 Actions 对象
val actions: Actions = Actions.forTable(table)
// 2.2 调用 rewriteDataFiles() 方法,实现合并操作
val rewritten: RewriteDataFilesAction = actions.rewriteDataFiles()
// 2.3 设置合并后数据文件的目标大小为 1024 字节,调用 targetSizeInBytes() 方法
val withTargetSize: RewriteDataFiles = rewritten.targetSizeInBytes(1024L)
// 2.4 执行合并操作,调用 execute() 方法
withTargetSize.execute()
}