0
点赞
收藏
分享

微信扫一扫

数据湖Iceberg-Flink DataFrame集成(7)



文章目录

  • 环境准备
  • 配置`pom.xml`
  • 配置log4j
  • 写入数据
  • 读取数据
  • 常规Source写法
  • Batch方式
  • Streaming方式
  • FLIP-27 Source写法
  • Batch方式
  • Streaming方式
  • 合并小文件



数据湖Iceberg-简介(1)


数据湖Iceberg-存储结构(2)


数据湖Iceberg-Hive集成Iceberg(3)


数据湖Iceberg-SparkSQL集成(4)


数据湖Iceberg-FlinkSQL集成(5)


数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)


数据湖Iceberg-Flink DataFrame集成(7)

环境准备

配置pom.xml

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.14.3</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>   <!–不会打包到依赖中,只参与编译,不参与运行 –>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <!--idea运行时也有webui-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
            <!--            <scope>provided</scope>-->
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.16 -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime-1.14</artifactId>
            <version>1.1.0</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

配置log4j

resources目录下新建log4j.properties。

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

写入数据

目前支持DataStream和DataStream格式的数据流写入Iceberg表

def writeData() = {
    val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
    val env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1)

    val input: SingleOutputStreamOperator[RowData] = env.fromElements("")
      .map(x => {
        import org.apache.flink.table.data.GenericRowData
        val genericRowData = new GenericRowData(2)
        genericRowData.setField(0, 123455L)
        genericRowData.setField(1, StringData.fromString("99L"))
        genericRowData
      })
    val loader: TableLoader = TableLoader.fromHadoopTable(path)
    FlinkSink.forRowData(input)
      .tableLoader(loader)
      //      .set("write-format", "orc") //设置其他参数
      //      .set(FlinkWriteOptions.OVERWRITE_MODE, "true") //设置其他参数
      .append() // append方式
    //        .overwrite(true) //overwrite方式
    //        .upsert(true) // upsert方式
    env.execute()
  }

写入配置参数选项

选项

默认值

说明

write-format

Parquet

同write.format.default

写入操作使用的文件格式:Parquet, avro或orc

target-file-size-bytes

536870912(512MB)

同write.target-file-size-bytes

控制生成的文件的大小,目标大约为这么多字节

upsert-enabled

同write.upsert.enabled,

overwrite-enabled

false

覆盖表的数据,不能和UPSERT模式同时开启

distribution-mode

None

同 write.distribution-mode

定义写数据的分布方式:

none:不打乱行;

hash:按分区键散列分布;

range:如果表有SortOrder,则通过分区键或排序键分配

compression-codec

同 write.(fileformat).compression-codec

compression-level

同 write.(fileformat).compression-level

compression-strategy

同write.orc.compression-strategy

读取数据

常规Source写法

Batch方式

/**
   * 读取 Iceberg 表数据的方法
   *
   * @return 返回读取的数据
   * @author Jast
   */
  def readDataForBatch() = {
    // 定义Iceberg表路径
    val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
    // 获取Flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 从路径中加载Iceberg表
    val tableLoader = TableLoader.fromHadoopTable(path);
    // 构建一个消费Iceberg表数据的DataStream
    val batch: DataStream[RowData] = FlinkSource.forRowData()
      .env(env)
      .tableLoader(tableLoader)
      //      .startSnapshotId() 可以指定从哪个快照开始
      .streaming(false)
      .build();
    // 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
    batch.map(r => (r.getLong(0), r.getString(1)))
      .returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
      .print("数据")

    // 执行Flink任务
    env.execute("Test Iceberg Read");
  }

Streaming方式

def readDataForStream() = {
    // 定义Iceberg表路径
    val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
    // 获取Flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 从路径中加载Iceberg表
    val tableLoader = TableLoader.fromHadoopTable(path);
    // 构建一个消费Iceberg表数据的DataStream
    val batch: DataStream[RowData] = FlinkSource.forRowData()
      .env(env)
      .tableLoader(tableLoader)
      .streaming(true)
      .build();
    // 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
    batch.map(r => (r.getLong(0), r.getString(1)))
      .returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
      .print("数据流")

    // 执行Flink任务
    env.execute("Test Iceberg Read");
  }

FLIP-27 Source写法

Batch方式

def readDataForBatchFLIP27() = {
    // 定义Iceberg表路径
    val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
    // 获取Flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 从路径中加载Iceberg表
    val tableLoader = TableLoader.fromHadoopTable(path);


    val source: IcebergSource[RowData] = IcebergSource.forRowData()
      .tableLoader(tableLoader)
      .assignerFactory(new SimpleSplitAssignerFactory)
      .build()

    val batch: DataStream[RowData] = env.fromSource(
      source,
      org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
      "Iceberg Source",
      TypeInformation.of(classOf[RowData])
    )

    // 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
    batch.map(r => (r.getLong(0), r.getString(1)))
      .returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
      .print("FLIP27数据")

    // 执行Flink任务
    env.execute("Test Iceberg Read");
  }

Streaming方式

def readDataForStreamFLIP27() = {
    // 定义Iceberg表路径
    val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
    // 获取Flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 从路径中加载Iceberg表
    val tableLoader = TableLoader.fromHadoopTable(path);
    val source: IcebergSource[RowData] = IcebergSource.forRowData()
      .tableLoader(tableLoader)
      .assignerFactory(new SimpleSplitAssignerFactory)
      .streaming(true)
      .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
      .build()

    val batch: DataStream[RowData] = env.fromSource(
      source,
      org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
      "Iceberg Source",
      TypeInformation.of(classOf[RowData])
    )
    // 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
    batch.map(r => (r.getLong(0), r.getString(1)))
      .returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
      .print("FLIP27数据流")

    // 执行Flink任务
    env.execute("Test Iceberg Read");
  }

合并小文件

/**
   * 合并小文件
   *
   * @return
   */
  def mergeSmallFiles() = {
    // 1.获取 Table对象
    // 1.1 创建 catalog对象
    //    val conf: Configuration = new Configuration()
    //    val hadoopCatalog = new HadoopCatalog(conf, "hdfs://hadoop1:8020/warehouse/spark-iceberg")

    // 1.2 通过 catalog加载 Table对象
    //    val table: Table = hadoopCatalog.loadTable(TableIdentifier.of("default", "a"))

    // 有Table对象,就可以获取元数据、进行维护表的操作
    //        System.out.println(table.history());
    //        System.out.println(table.expireSnapshots().expireOlderThan());

    // 2.通过 Actions 来操作 合并
    //    Actions.forTable(table)
    //      .rewriteDataFiles()
    //      .targetSizeInBytes(1024L)
    //      .execute();


    // 1. 获取 Table 对象
    // 1.1 创建 Configuration 对象
    val conf: Configuration = new Configuration()
    // 1.2 创建 HadoopCatalog 对象,传入 Configuration 对象和表路径字符串
    val hadoopCatalog = new HadoopCatalog(conf, "hdfs://hadoop1:8020/warehouse/spark-iceberg")
    // 1.3 通过 TableIdentifier.of() 方法创建 TableIdentifier 对象,传入库名和表名
    val tableId: TableIdentifier = TableIdentifier.of("default", "a")
    // 1.4 通过 HadoopCatalog 对象的 loadTable() 方法加载 Table 对象,传入 TableIdentifier 对象
    val table: Table = hadoopCatalog.loadTable(tableId)

    // 2. 通过 Actions 来操作合并
    // 2.1 调用 Actions.forTable() 方法,传入 Table 对象,实例化 Actions 对象
    val actions: Actions = Actions.forTable(table)
    // 2.2 调用 rewriteDataFiles() 方法,实现合并操作
    val rewritten: RewriteDataFilesAction = actions.rewriteDataFiles()
    // 2.3 设置合并后数据文件的目标大小为 1024 字节,调用 targetSizeInBytes() 方法
    val withTargetSize: RewriteDataFiles = rewritten.targetSizeInBytes(1024L)
    // 2.4 执行合并操作,调用 execute() 方法
    withTargetSize.execute()
  }


举报

相关推荐

0 条评论