0
点赞
收藏
分享

微信扫一扫

flink 流批统一优化整理

以沫的窝 2021-09-21 阅读 68

流批一体架构


A.flink 1.11 及之前

  • 统一了Tabel/SQL API & Planner
  • 统一shuffle架构

B.flink1.12

1.DataStream API 批执行模式

背景:flink 中虽然上层Table/Sql已经流批统一,但底层api仍是分开的,DataStream和DataSet。

因为批处理是流处理的特例,所以讲两种合并成统一的API,这样的好处是:

a. 具有好的复用性,作业可以在流和批这两种执行模式之间自由地切换,而无需重写任何代码。因此,用户可以复用同一个作业,来处理实时数据和历史数据。

b.维护简单,统一的 API 意味着流和批可以共用同一组 connector,维护同一套代码,并能够轻松地实现流批混合执行,例如 backfilling 之类的场景。

考虑到这些优点,社区已朝着流批统一的 DataStream API 迈出了第一步:支持高效的批处理(FLIP-134)。

从长远来看,这意味着 DataSet API 将被弃用(FLIP-131),其功能将被包含在 DataStream API 和 Table API / SQL 中。

■ 有限流上的批处理
您已经可以使用 DataStream API 来处理有限流(例如文件)了,但需要注意的是,运行时并不“知道”作业的输入是有限的。为了优化在有限流情况下运行时的执行性能,新的 BATCH 执行模式,对于聚合操作,全部在内存中进行,且使用 sort-based shuffle(FLIP-140)和优化过的调度策略(请参见 Pipelined Region Scheduling 了解更多详细信息)。因此,DataStream API 中的 BATCH 执行模式已经非常接近 Flink 1.12 中 DataSet API 的性能。有关性能的更多详细信息,请查看 FLIP-140。

在 Flink 1.12 中,默认执行模式为 STREAMING,要将作业配置为以 BATCH 模式运行,可以在提交作业的时候,设置参数 execution.runtime-mode:

$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar

或者通过编程的方式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH); 

2. 流批统一Source & Sink API

  • 1.11版本已经支持了 source connector 工作在流批两种模式下。
    TODO
  • 1.12支持了对Data Sink API的重构。

现有只支持FileSInkConnector。替换现有的StreamingFileSink Connector。

新的抽象引入了 write/commit 协议和一个更加模块化的接口。Sink 的实现者只需要定义 whathow

SinkWriter,用于写数据,并输出需要 commit 的内容(例如,committables);

Committer 和 GlobalCommitter,封装了如何处理 committables。

框架会负责 whenwhere:即在什么时间,以及在哪些机器或进程中 commit。

3. Pipelined Region 调度 (FLIP-119)

1). 1.12之前方案

(1)前提:两种模式:

a. pipelined result: 数据顺序一个一个的消费

b.blocking result: 在上游所有数据生成完成才开始执行。

(2)版本1.12之前是流批分开的

  • sreaming: (使用的是a方式)

  • batch: (stage内部使用的是pipelined,stage之间使用的是blocking)

这种方式优点:a.只用调度有数据的stage,所以更高效。 b.stage fail可以单独重启,不需要重新计算其他stage。

(3) before 1.19 调度策略

统一的调度器需要对每个阶段,包括流处理和批处理,都要好的资源调度。1.12之前采用的是不同的调度策略,分别解决流批问题。

a. “all at once”

立刻执行,用于流处理。对于批处理,立刻执行可能会影响资源利用率,可能导致资源预先分配,等待上游数据而导致资源浪费。

b."lazy from sources"

对于批处理,使用懒加载方式,即input数据准备好之后再分配后续operator的资源。

这个策略独立运行在每个子任务中,所以不会识别同时在运行的所有subtask.

举例:

A 是批数据,B是流表,C是需要join。

slot=1:B-C chain,那么C因为A未完成而无法执行。flink会尝试部署A,因为没有slot导致job失败。

slot=2:这时可用,flink能部署A,job能成功执行,但是当A在执行的时候,第一个slot会被B和C占用而浪费资源。

失败情况: 如果B→C失败,我们不用再重新执行A,但是1.9之前是不支持的。

社区为支持流批统一,设计了一个统一的调度和失败策略,Pipelined region scheduling.

2). pilelined region scheduling

新调度策略在开始substask之前,通过分析ExecutionGraph,识别出pipelined region。

region内部使用的是pipelined方式,外部使用的是blocking方式。

(1)调度策略:

在region内,消费者需要不断消费生产的数据,以保证生产者不被block,并且避免背压。因此region的所有子任务必须被调度,失败是整体重启,同时运行。

图中r1→( r2,r3)→ r4,如果jobmanager有足够资源,那么在上游数据finished之后,将尽可能的执行更多的下游region。子任务执行是根据region分配的,要么成功,要么失败。

(2)失败策略

当然子任务失败,那么region重启,重新消费输入数据。如果一些输入数据丢失,那么flink会重新执行上游生产region。

好处:

1.可以在有限资源情况下,尽可能的执行批任务。

2.可以提高资源利用率并消除死锁。

参考:https://flink.apache.org/2020/12/15/pipelined-region-sheduling.html

C. flink1.13

1.大规模作业调度

背景:

由于在创建图的时候,边会存储对象,那么在大规模作业调度时,会占用大量内存。

引入

A. 在ExecutionGraph中有两种,一种是pointwise模式(一对一或一对多),还有一种是alltoall(多堆多)

B. 什么情况是pointwise模式?

代码中:


ForwardPartitioner 和RescalePartitioner 属于pointwise模式,其他的均属于多对多。

C. 针对这两种方式,将多消费者合成消费组,减少对象创建,将O(n)变成了O(1)


2.sort-merge shuffle

中间数据是如何保存和读取的?

背景:
针对批作业,在数据shuffle的优化。
上游跑完写中间文件
节省资源,不需要上游和下游同时起来。
失败不需要重新执行。


flink 默认的shuffle,给每个下游输出单独文件。

  • 大量小文件
  • 内存浪费,每个文件至少用1个buffer
  • 下游数据读取产生大量随机I/O

新方案:sort shuffle

  1. 先写缓冲区,把数据按照不同的下游分组,最后写入文件


(1)申请固定大小缓冲区,避免缓冲区随着规模增大而增大
(2)数据写入缓冲区,在缓冲区满的时候会对数据进行排序(合并分区),然后写入单独文件。后边数据接着写到文件后边。文件有多个段,每个段内有序。

没有采用外排序,merge不划算。

  1. 下游上层做I/O调度,下游读取是通过一个调度器。



参考:https://wiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink

3. 有限作业一致性保证

背景:有限流不能做checkpoint,无法保证一致性。

优化:


所有subtask结束,只存标记
部分subtask结束,会存储剩下部分的数据。

结束语义整理:
数据有限正常结束
savepoint结束

endofinput 通知,统一做checkpoint,保证最后数据一定会提交到系统中。

stopwithsavepoint,不同统一做checkpoint,
正常结束,认为任务不再重启,调用endofinput,提交最后数据。
stop-with-savepoint,通过savepoint结束,后期会重启,不会提交最后数据
stop-with-savepoint --drain ,通过savepoint结束,后期不会重启,调用endofinput,提交最后数据。

参考:https://developer.aliyun.com/live/246712?spm=a2c6h.12873639.0.0.2f9612a824wQIq

举报

相关推荐

0 条评论