0
点赞
收藏
分享

微信扫一扫

Flink教程(24)- Flink高级特性(File Sink)

zidea 2022-03-22 阅读 96



文章目录

  • 01 引言
  • 02 File Sink介绍
  • 03 File Sink案例演示
  • 04 文末

01 引言

在前面的博客,我们学习了Flink​的Streaming File Sink了,有兴趣的同学可以参阅下:

  • 《Flink教程(01)- Flink知识图谱》
  • 《Flink教程(02)- Flink入门》
  • 《Flink教程(03)- Flink环境搭建》
  • 《Flink教程(04)- Flink入门案例》
  • 《Flink教程(05)- Flink原理简单分析》
  • 《Flink教程(06)- Flink批流一体API(Source示例)》
  • 《Flink教程(07)- Flink批流一体API(Transformation示例)》
  • 《Flink教程(08)- Flink批流一体API(Sink示例)》
  • 《Flink教程(09)- Flink批流一体API(Connectors示例)》
  • 《Flink教程(10)- Flink批流一体API(其它)》
  • 《Flink教程(11)- Flink高级API(Window)》
  • 《Flink教程(12)- Flink高级API(Time与Watermaker)》
  • 《Flink教程(13)- Flink高级API(状态管理)》
  • 《Flink教程(14)- Flink高级API(容错机制)》
  • 《Flink教程(15)- Flink高级API(并行度)》
  • 《Flink教程(16)- Flink Table与SQL》
  • 《Flink教程(17)- Flink Table与SQL(案例与SQL算子)》
  • 《Flink教程(18)- Flink阶段总结》
  • 《Flink教程(19)- Flink高级特性(BroadcastState)》
  • 《Flink教程(20)- Flink高级特性(双流Join)》
  • 《Flink教程(21)- Flink高级特性(End-to-End Exactly-Once)》
  • 《Flink教程(22)- Flink高级特性(异步IO)》
  • 《Flink教程(23)- Flink高级特性(Streaming File Sink)》

本文主要讲解Flink的高级特性其中之一的 File Sink。

02 File Sink介绍

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html


Flink教程(24)- Flink高级特性(File Sink)_sql

新的 Data Sink API (Beta):

  • 之前发布的 Flink 版本中[1],已经支持了 source connector 工作在流批两种模式下,因此在 Flink 1.12 中,社区着重实现了统一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 协议和一个更加模块化的接口。Sink 的实现者只需要定义 what 和 how:SinkWriter,用于写数据,并输出需要 commit 的内容(例如,committables);Committer 和 GlobalCommitter,封装了如何处理 committables。框架会负责 when 和 where:即在什么时间,以及在哪些机器或进程中 commit。

Flink教程(24)- Flink高级特性(File Sink)_hive_02

这种模块化的抽象允许为 BATCH 和 STREAMING 两种执行模式,实现不同的运行时策略,以达到仅使用一种 sink 实现,也可以使两种模式都可以高效执行。Flink 1.12 中,提供了统一的 FileSink connector,以替换现有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也将逐步迁移到新的接口。

Flink 1.12的 FileSink 为批处理和流式处理提供了一个统一的接收器,它将分区文件写入Flink文件系统抽象所支持的文件系统。这个文件系统连接器为批处理和流式处理提供了相同的保证,它是现有流式文件接收器的一种改进。

03 File Sink案例演示

/**
* @author : YangLinWei
* @createTime: 2022/3/9 9:15 上午
*/
public class FileSinkDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));

//2.source
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

//3.sink
//设置sink的前缀和后缀
//文件的头和文件扩展名
//prefix-xxx-.txt
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".txt")
.build();

//设置sink的路径
String outputPath = "hdfs://node1:8020/FlinkFileSink/parquet";

final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner<>())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(config)
.build();

lines.sinkTo(sink).setParallelism(1);

env.execute();
}
}

04 文末

本文主要讲解Flink的高级特性其中之一的File Sink,谢谢大家的阅读,本文完!



举报

相关推荐

0 条评论