第13讲:如何实现生产环境中的 Flink 高可用配置
我们在第 06 课时“Flink 集群安装部署和 HA 配置”中讲解了 Flink 的几种常见部署模式,并且简单地介绍了 HA 配置。
概述
事实上,集群的高可用(High Availablility,以下简称 HA)配置是大数据领域经典的一个问题。
我们在第 03 课时“Flink 的编程模型与其他框架比较”中也提到过 Flink 集群中的角色,其中 JobManager 扮演的是集群管理者的角色,负责调度任务、协调 Checkpoints、协调故障恢复、收集 Job 的状态信息,并管理 Flink 集群中的从节点 TaskManager。
在默认的情况下,我们的每个集群都只有一个 JobManager 实例,假如这个 JobManager 崩溃了,那么将会导致我们的作业运行失败,并且无法提交新的任务。
因此,在生产环境中我们的集群应该如何配置以达到高可用的目的呢?针对不同模式进行部署的集群,我们需要不同的配置。
源码分析
Flink 中的 JobManager、WebServer 等组件都需要高可用保障,并且 Flink 还需要进行 Checkpoint 元数据的持久化操作。与 Flink HA 相关的类图如下图所示,我们跟随源码简单看一下 Flink HA 的实现。
HighAvailabilityMode 类中定义了三种高可用性模式枚举,如下图所示:
-
NONE:非 HA 模式
-
ZOOKEEPER:基于 ZK 实现 HA
-
FACTORY_CLASS:自定义 HA 工厂类,该类需要实现 HighAvailabilityServicesFactory 接口
具体的高可用实例对象创建则在 HighAvailabilityServicesUtils 类中有体现,如下图所示:
创建 HighAvailabilityServices 的实例方法如下:
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
AddressResolution addressResolution) throws Exception {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
switch (highAvailabilityMode) {
case NONE:
// 省略部分代码
// 返回非HA服务类实例
return new StandaloneHaServices(
resourceManagerRpcUrl,
dispatcherRpcUrl,
jobManagerRpcUrl,
String.format("%s%s:%s", protocol, address, port));
case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
<span class="hljs-comment">// 返回ZK HA 服务类实例</span>
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> ZooKeeperHaServices(
ZooKeeperUtils.startCuratorFramework(configuration),
executor,
configuration,
blobStoreService);
<span class="hljs-keyword">case</span> FACTORY_CLASS:
<span class="hljs-comment">// 返回自定义 HA 服务类实例</span>
<span class="hljs-keyword">return</span> createCustomHAServices(configuration, executor);
<span class="hljs-keyword">default</span>:
<span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> Exception(<span class="hljs-string">"Recovery mode "</span> + highAvailabilityMode + <span class="hljs-string">" is not supported."</span>);
}
}
HighAvailabilityServices 接口定义了 HA 服务类应当实现的方法,实现类主要有 StandaloneHaServices(非 HA)、ZooKeeperHaServices、YarnHighAvailabilityServices。
ZooKeeperHaServices 主要提供了创建 LeaderRetrievalService 和 LeaderElectionService 等方法,并给出了各个服务组件使用的 ZK 节点名称。
ZooKeeperLeaderElectionService 实现了 LeaderElectionService 中 leader 选举和获取 leader 的方法。
public interface LeaderElectionService {
// 启动 leader 选举服务
void start(LeaderContender contender) throws Exception;
// 停止 leader 选举服务
void stop() throws Exception;
// 获取新的 leader session ID
void confirmLeaderSessionID(UUID leaderSessionID);
// 是否拥有 leader
boolean hasLeadership(@Nonnull UUID leaderSessionId);
}
Standalone 集群高可用配置
简介
如果你的集群是 Standalone 模式,那么此时需要对 JobManager 做主备,一般推荐一个主 JobManager 和多个备用的 JobManagers。当你的主 JobManager 发生故障时,备用的 JobManager 会接管集群,以保证我们的任务正常运行。这里需要注意的是,主和备 JobManager 只是我们人为的区分,实际上它们并没有区别,每一个 JobManager 都可以当作主或者备。
Standalone 模式下的 HA 配置,Flink 依赖 ZooKeeper 实现。ZooKeeper 集群独立于 Flink 集群之外,主要被用来进行 Leader 选举和轻量级状态一致性存储。更多关于 ZooKeeper 的资料可以直接点击这里查看。
文件配置
在这里我们要特别说明的是,Flink 自带了一个简单的 ZooKeeper 集群,并且提供了一键启动的脚本。在实际生产环境中建议自己搭建 ZooKeeper 集群,以方便我们进行配置管理。
假设我们在 3 台虚拟机之间搭建 standalone 集群,并且进行高可用配置:
IP | hostname | 备注 |
---|---|---|
192.168.2.100 | master | 主节点、ZK 01 |
192.168.2.101 | slave01 | 从节点 01、ZK 02 |
192.168.2.102 | slave02 | 从节点 02、ZK 03 |
我们需要在 3 台机器上同时修改 Flink 配置文件中的 master 文件:
master:8081
slave01:8081
slave02:8081
表示指定 ZooKeeper 集群的访问地址。
然后,需要修改 conf/flink-conf.yaml 文件,与高可用的配置相关的几个参数,如下所示:
#========================================================
# High Availability
#=====================================================================
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one
high-availability.storageDir: hdfs:///flink/recovery
它们分别代表:
-
high-availability,高可用性模式设置为 zookeeper,用来打开高可用模式;
-
high-availability.zookeeper.quorum,指定一组 ZooKeeper 服务器,它提供分布式协调服务,Flink 可以在指定的地址和端口访问 ZooKeeper;
-
high-availability.zookeeper.path.root,指定 ZooKeeper 的根节点,并且在该节点下放置所有集群节点;
-
high-availability.cluster-id,为每个集群指定一个 ID,用来存储该集群的相关数据;
-
high-availability.storageDir,高可用存储目录,JobManager 的元数据保存在文件系统 storageDir 中,一般来讲是 HDFS 的地址。
对于 flink-conf.yaml 文件中的配置,除了 jobmanager.rpc.address 和 jobmanager.web.address 都各自配置自己机器的 IP 之外,其他的关于高可用的配置一模一样。
这里特别要注意下对于高可用性配置的部分。其中,high-availability、high-availability.storageDir 和 high-availability.zookeeper.quorum 这三项是必须配置的;后两项 high-availability.zookeeper.path.root 和 high-availability.cluster-id 配置是可选的,但是通常我们建议都手动进行配置,方便排查问题。
Yarn 集群高可用配置
与 Standalone 集群不同的是,Flink on Yarn 的高可用配置只需要一个 JobManager。当 JobManager 发生失败时,Yarn 负责将其重新启动。
我们需要修改 yarn-site.yaml 文件中的配置,如下所示:
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
yarn.resourcemanager.am.max-attempts 表示 Yarn 的 application master 的最大重试次数。
除了上述 HA 配置之外,还需要配置 flink-conf.yaml 中的最大重试次数:
yarn.application-attempts: 10
默认情况下,该配置的值为 2:
我们在 Flink 的官网中可以查到,当你的 yarn.application-attempts 配置为 10 的时候:
同时官网给出了重要提示:
-
YARN 2.3.0 < version < 2.4.0. All containers are restarted if the application master fails.
-
YARN 2.4.0 < version < 2.6.0. TaskManager containers are kept alive across application master failures. This has the advantage that the startup time is faster and that the user does not have to wait for obtaining the container resources again.
-
YARN 2.6.0 <= version: Sets the attempt failure validity interval to the Flinks’ Akka timeout value. The attempt failure validity interval says that an application is only killed after the system has seen the maximum number of application attempts during one interval. This avoids that a long lasting job will deplete it’s application attempts.
不同 Yarn 版本的容器关闭行为不同,需要我们特别注意。
-
YARN 2.3.0 < YARN 版本 < 2.4.0。如果 application master 进程失败,则所有的 container 都会重启。
-
YARN 2.4.0 < YARN 版本 < 2.6.0。TaskManager container 在 application master 故障期间,会继续工作。这样的优点是:启动时间更快,且缩短了所有 task manager 启动时申请资源的时间。
-
YARN 2.6.0 <= YARN 版本:失败重试的间隔会被设置为 Akka 的超时时间。在一次时间间隔内达到最大失败重试次数才会被置为失败。
另外,需要注意的是,假如你的 ZooKeeper 集群使用 Kerberos 安全模式运行,那么可以根据需要添加下面的配置:
zookeeper.sasl.service-name
zookeeper.sasl.login-context-name
如果你不想搭建自己的 ZooKeeper 集群或者简单地进行本地测试,你可以使用 Flink 自带的 ZooKeeper 集群,但是并不推荐,我们建议读者搭建自己的 ZooKeeper 集群。
总结
本课时我们主要讲解了 Flink 集群的高可用配置,Standalone 和 Yarn 集群的配置有所不同。在生产环境中,Flink 集群的高可用配置必不可少,并且我们从源码上简单分析了高可用配置的原理。
点击这里下载本课程源码
第14讲:Flink Exactly-once 实现原理解析
这一课时我们将讲解 Flink “精确一次”的语义实现原理,同时这也是面试的必考点。
Flink 的“精确一次”处理语义是,Flink 提供了一个强大的语义保证,也就是说在任何情况下都能保证数据对应用产生的效果只有一次,不会多也不会少。
那么 Flink 是如何实现“端到端的精确一次处理”语义的呢?
背景
通常情况下,流式计算系统都会为用户提供指定数据处理的可靠模式功能,用来表明在实际生产运行中会对数据处理做哪些保障。一般来说,流处理引擎通常为用户的应用程序提供三种数据处理语义:最多一次、至少一次和精确一次。
-
最多一次(At-most-Once):这种语义理解起来很简单,用户的数据只会被处理一次,不管成功还是失败,不会重试也不会重发。
-
至少一次(At-least-Once):这种语义下,系统会保证数据或事件至少被处理一次。如果中间发生错误或者丢失,那么会从源头重新发送一条然后进入处理系统,所以同一个事件或者消息会被处理多次。
-
精确一次(Exactly-Once):表示每一条数据只会被精确地处理一次,不多也不少。
Exactly-Once 是 Flink、Spark 等流处理系统的核心特性之一,这种语义会保证每一条消息只被流处理系统处理一次。“精确一次” 语义是 Flink 1.4.0 版本引入的一个重要特性,而且,Flink 号称支持“端到端的精确一次”语义。
在这里我们解释一下“端到端(End to End)的精确一次”,它指的是 Flink 应用从 Source 端开始到 Sink 端结束,数据必须经过的起始点和结束点。Flink 自身是无法保证外部系统“精确一次”语义的,所以 Flink 若要实现所谓“端到端(End to End)的精确一次”的要求,那么外部系统必须支持“精确一次”语义;然后借助 Flink 提供的分布式快照和两阶段提交才能实现。
分布式快照机制
我们在之前的课程中讲解过 Flink 的容错机制,Flink 提供了失败恢复的容错机制,而这个容错机制的核心就是持续创建分布式数据流的快照来实现。
同 Spark 相比,Spark 仅仅是针对 Driver 的故障恢复 Checkpoint。而 Flink 的快照可以到算子级别,并且对全局数据也可以做快照。Flink 的分布式快照受到 Chandy-Lamport 分布式快照算法启发,同时进行了量身定做,有兴趣的同学可以搜一下。
Barrier
Flink 分布式快照的核心元素之一是 Barrier(数据栅栏),我们也可以把 Barrier 简单地理解成一个标记,该标记是严格有序的,并且随着数据流往下流动。每个 Barrier 都带有自己的 ID,Barrier 极其轻量,并不会干扰正常的数据处理。
如上图所示,假如我们有一个从左向右流动的数据流,Flink 会依次生成 snapshot 1、 snapshot 2、snapshot 3……Flink 中有一个专门的“协调者”负责收集每个 snapshot 的位置信息,这个“协调者”也是高可用的。
Barrier 会随着正常数据继续往下流动,每当遇到一个算子,算子会插入一个标识,这个标识的插入时间是上游所有的输入流都接收到 snapshot n。与此同时,当我们的 sink 算子接收到所有上游流发送的 Barrier 时,那么就表明这一批数据处理完毕,Flink 会向“协调者”发送确认消息,表明当前的 snapshot n 完成了。当所有的 sink 算子都确认这批数据成功处理后,那么本次的 snapshot 被标识为完成。
这里就会有一个问题,因为 Flink 运行在分布式环境中,一个 operator 的上游会有很多流,每个流的 barrier n 到达的时间不一致怎么办?这里 Flink 采取的措施是:快流等慢流。
异步和增量
按照上面我们介绍的机制,每次在把快照存储到我们的状态后端时,如果是同步进行就会阻塞正常任务,从而引入延迟。因此 Flink 在做快照存储时,可采用异步方式。
此外,由于 checkpoint 是一个全局状态,用户保存的状态可能非常大,多数达 G 或者 T 级别。在这种情况下,checkpoint 的创建会非常慢,而且执行时占用的资源也比较多,因此 Flink 提出了增量快照的概念。也就是说,每次都是进行的全量 checkpoint,是基于上次进行更新的。
两阶段提交
上面我们讲解了基于 checkpoint 的快照操作,快照机制能够保证作业出现 fail-over 后可以从最新的快照进行恢复,即分布式快照机制可以保证 Flink 系统内部的“精确一次”处理。但是我们在实际生产系统中,Flink 会对接各种各样的外部系统,比如 Kafka、HDFS 等,一旦 Flink 作业出现失败,作业会重新消费旧数据,这时候就会出现重新消费的情况,也就是重复消费。
针对这种情况,Flink 1.4 版本引入了一个很重要的功能:两阶段提交,也就是 TwoPhaseCommitSinkFunction。两阶段搭配特定的 source 和 sink(特别是 0.11 版本 Kafka)使得“精确一次处理语义”成为可能。
在 Flink 中两阶段提交的实现方法被封装到了 TwoPhaseCommitSinkFunction 这个抽象类中,我们只需要实现其中的beginTransaction、preCommit、commit、abort 四个方法就可以实现“精确一次”的处理语义,实现的方式我们可以在官网中查到:
Flink-Kafka Exactly-once
如上图所示,我们用 Kafka-Flink-Kafka 这个案例来介绍一下实现“端到端精确一次”语义的过程,整个过程包括:
-
从 Kafka 读取数据
-
窗口聚合操作
-
将数据写回 Kafka
整个过程可以总结为下面四个阶段:
-
一旦 Flink 开始做 checkpoint 操作,那么就会进入 pre-commit 阶段,同时 Flink JobManager 会将检查点 Barrier 注入数据流中 ;
-
当所有的 barrier 在算子中成功进行一遍传递,并完成快照后,则 pre-commit 阶段完成;
-
等所有的算子完成“预提交”,就会发起一个“提交”动作,但是任何一个“预提交”失败都会导致 Flink 回滚到最近的 checkpoint;
-
pre-commit 完成,必须要确保 commit 也要成功,上图中的 Sink Operators 和 Kafka Sink 会共同来保证。
现状
目前 Flink 支持的精确一次 Source 列表如下表所示,你可以使用对应的 connector 来实现对应的语义要求:
数据源 | 语义保证 | 备注 |
---|---|---|
Apache Kafka | exactly once | 需要对应的 Kafka 版本 |
AWS Kinesis Streams | exactly once | |
RabbitMQ | at most once (v 0.10) / exactly once (v 1.0) | |
Twitter Streaming API | at most once | |
Collections | exactly once | |
Files | exactly once | |
Sockets | at most once |
如果你需要实现真正的“端到端精确一次语义”,则需要 sink 的配合。目前 Flink 支持的列表如下表所示:
写入目标 | 语义保证 | 备注 |
---|---|---|
HDFS rolling sink | exactly once | 依赖 Hadoop 版本 |
Elasticsearch | at least once | |
Kafka producer | at least once / exactly once | 需要 Kafka 0.11 及以上 |
Cassandra sink | at least once / exactly once | 幂等更新 |
AWS Kinesis Streams | at least once | |
File sinks | at least once | |
Socket sinks | at least once | |
Standard output | at least once | |
Redis sink | at least once |
总结
由于强大的异步快照机制和两阶段提交,Flink 实现了“端到端的精确一次语义”,在特定的业务场景下十分重要,我们在进行业务开发需要语义保证时,要十分熟悉目前 Flink 支持的语义特性。
这一课时的内容较为晦涩,建议你从源码中去看一下具体的实现。
点击这里下载本课程源码
第15讲:如何排查生产环境中的反压问题
这一课时我们主要讲解生产环境中 Flink 任务经常会遇到的一个问题,即如何处理好反压问题将直接关系到任务的资源使用和稳定运行。
反压问题是流式计算系统中经常碰到的一个问题,如果你的任务出现反压节点,那么就意味着任务数据的消费速度小于数据的生产速度,需要对生产数据的速度进行控制。通常情况下,反压经常出现在促销、热门活动等场景,它们有一个共同的特点:短时间内流量陡增造成数据的堆积或者消费速度变慢。
不同框架的反压对比
目前主流的大数据实时处理系统都对反压问题进行了专门的处理,希望框架自身能检测到被阻塞的算子,然后降低数据生产者的发送速率。我们所熟悉的 Storm、Spark Streaming、Flink 的实现稍微有所不同。
Storm
Storm 从 1.0 版本以后引入了全新的反压机制,Storm 会主动监控工作节点。当工作节点接收数据超过一定的水位值时,那么反压信息会被发送到 ZooKeeper 上,然后 ZooKeeper 通知所有的工作节点进入反压状态,最后数据的生产源头会降低数据的发送速度。
Spark Streaming
Spark Streaming 在原有的架构基础上专门设计了一个 RateController 组件,该组件利用经典的 PID 算法。向系统反馈当前系统处理数据的几个重要属性:消息数量、调度时间、处理时间、调度时间等,然后根据这些参数计算出一个速率,该速率则是当前系统处理数据的最大能力,Spark Streaming 会根据计算结果对生产者进行限速。
Flink
Flink 的反压设计利用了网络传输和动态限流。在 Flink 的设计哲学中,纯流式计算给 Flink 进行反压设计提供了天然的优势。
我们在以前的课程中讲解过,Flink 任务的组成由基本的“流”和“算子”构成,那么“流”中的数据在“算子”间进行计算和转换时,会被放入分布式的阻塞队列中。当消费者的阻塞队列满时,则会降低生产者的数据生产速度。
如上图所示,我们看一下 Flink 进行逐级反压的过程。当 Task C 的数据处理速度发生异常时,Receive Buffer 会呈现出队列满的情况,Task B 的 Send Buffer 会感知到这一点,然后把数据发送速度降低。以此类推,整个反压会一直从下向上传递到 Source 端;反之,当下游的 Task 处理能力有提升后,会在此反馈到 Source Task,数据的发送和读取速率都会升高,提高了整个任务的处理能力。
反压的定位
当你的任务出现反压时,如果你的上游是类似 Kafka 的消息系统,很明显的表现就是消费速度变慢,Kafka 消息出现堆积。
如果你的业务对数据延迟要求并不高,那么反压其实并没有很大的影响。但是对于规模很大的集群中的大作业,反压会造成严重的“并发症”。首先任务状态会变得很大,因为数据大规模堆积在系统中,这些暂时不被处理的数据同样会被放到“状态”中。另外,Flink 会因为数据堆积和处理速度变慢导致 checkpoint 超时,而 checkpoint 是 Flink 保证数据一致性的关键所在,最终会导致数据的不一致发生。
那么我们应该如何发现任务是否出现反压了呢?
Flink Web UI
Flink 的后台页面是我们发现反压问题的第一选择。Flink 的后台页面可以直观、清晰地看到当前作业的运行状态。
如上图所示,是 Flink 官网给出的计算反压状态的案例。需要注意的是,只有用户在访问点击某一个作业时,才会触发反压状态的计算。在默认的设置下,Flink 的 TaskManager 会每隔 50 ms 触发一次反压状态监测,共监测 100 次,并将计算结果反馈给 JobManager,最后由 JobManager 进行计算反压的比例,然后进行展示。
这个比例展示逻辑如下:
-
OK: 0 <= Ratio <= 0.10,正常;
-
LOW: 0.10 < Ratio <= 0.5,一般;
-
HIGH: 0.5 < Ratio <= 1,严重。
官网同样给出了不同反压状态下,Flink Web UI 中任务运行的状态,如下图所示:
Flink Metrics
如果你想对 Flink 做更为详细的监控的话,Flink 本身提供了大量的 REST API 来获取任务的各种状态。Flink 提供的所有系统监控指标你都点击这里找到。
随着版本的持续变更,截止 1.10.0 版本,Flink 提供的监控指标中与反压最为密切的如下表所示:
指标名称 | 用途 |
---|---|
outPoolUsage | 发送端缓冲池的使用率 |
inPoolUsage | 接收端缓冲池的使用率 |
floatingBuffersUsage | 处理节点缓冲池的使用率 |
exclusiveBuffersUsage | 数据输入方缓冲池的使用率 |
我们逐个介绍一下这四个指标。
-
outPoolUsage
这个指标代表的是当前 Task 的数据发送速率,当一个 Task 的 outPoolUsage 很高,则代表着数据发送速度很快。但是当一个 Task 的 outPoolUsage 很低,那么就需要特别注意,有可能是下游的处理速度很低导致的,也有可能当前节点就是反压节点,导致数据处理速度很慢。
-
inPoolUsage
inPoolUsage 表示当前 Task 的数据接收速率,通常会和 outPoolUsage 配合使用;如果一个节点的 inPoolUsage 很高而 outPoolUsage 很低,则这个节点很有可能就是反压节点。
-
floatingBuffersUsage 和 exclusiveBuffersUsage
floatingBuffersUsage 表示处理节点缓冲池的使用率;exclusiveBuffersUsage 表示数据输入通道缓冲池的使用率。
反压问题处理
我们已经知道反压产生的原因和监控的方法,当线上任务出现反压时,需要如何处理呢?
主要通过以下几个方面进行定位和处理:
-
数据倾斜
-
GC
-
代码本身
数据倾斜
数据倾斜问题是我们生产环境中出现频率最多的影响任务运行的因素,可以在 Flink 的后台管理页面看到每个 Task 处理数据的大小。当数据倾斜出现时,通常是简单地使用类似 KeyBy 等分组聚合函数导致的,需要用户将热点 Key 进行预处理,降低或者消除热点 Key 的影响。
GC
垃圾回收问题也是造成反压的因素之一。不合理的设置 TaskManager 的垃圾回收参数会导致严重的 GC 问题,我们可以通过 -XX:+PrintGCDetails 参数查看 GC 的日志。
代码本身
开发者错误地使用 Flink 算子,没有深入了解算子的实现机制导致性能问题。我们可以通过查看运行机器节点的 CPU 和内存情况定位问题。
总结
这一课时我们主要讨论了反压问题,涉及不同框架对反压问题的处理方式、Flink 任务反压的定位和如何处理。我们可以通过多种方式监控 Flink 任务的运行状态来定位反压问题,避免在生产环境中出现严重事故。
点击这里下载本课程源码
第16讲:如何处理生产环境中的数据倾斜问题
这一课时我们主要讲解如何处理生产环境中的数据倾斜问题。
无论是对于 Flink、Spark 这样的实时计算框架还是 Hive 等离线计算框架,数据量从来都不是问题,真正引起问题导致严重后果的是数据倾斜。所谓数据倾斜,是指在大规模并行处理的数据中,其中某个运行节点处理的数据远远超过其他部分,这会导致该节点压力极大,最终出现运行失败从而导致整个任务的失败。
我们在这一课时中将分析出现数据倾斜的原因,Flink 任务中最容易出现数据倾斜的几个算子并且给出解决方案。
数据倾斜背景和危害
数据倾斜产生的原因和危害和解决方案有哪些呢?我们一一来看。
数据倾斜原理
目前我们所知道的大数据处理框架,比如 Flink、Spark、Hadoop 等之所以能处理高达千亿的数据,是因为这些框架都利用了分布式计算的思想,集群中多个计算节点并行,使得数据处理能力能得到线性扩展。
我们在第 03 课时“Flink 的编程模型与其他框架比较”中曾经讲过,在实际生产中 Flink 都是以集群的形式在运行,在运行的过程中包含了两类进程。其中 TaskManager 实际负责执行计算的 Worker,在其上执行 Flink Job 的一组 Task,Task 则是我们执行具体代码逻辑的容器。理论上只要我们的任务 Task 足够多就可以对足够大的数据量进行处理。
但是实际上大数据量经常出现,一个 Flink 作业包含 200 个 Task 节点,其中有 199 个节点可以在很短的时间内完成计算。但是有一个节点执行时间远超其他结果,并且随着数据量的持续增加,导致该计算节点挂掉,从而整个任务失败重启。我们可以在 Flink 的管理界面中看到任务的某一个 Task 数据量远超其他节点。
数据倾斜原因和解决方案
Flink 任务出现数据倾斜的直观表现是任务节点频繁出现反压,但是增加并行度后并不能解决问题;部分节点出现 OOM 异常,是因为大量的数据集中在某个节点上,导致该节点内存被爆,任务失败重启。
产生数据倾斜的原因主要有 2 个方面:
-
业务上有严重的数据热点,比如滴滴打车的订单数据中北京、上海等几个城市的订单量远远超过其他地区;
-
技术上大量使用了 KeyBy、GroupBy 等操作,错误的使用了分组 Key,人为产生数据热点。
因此解决问题的思路也很清晰:
-
业务上要尽量避免热点 key 的设计,例如我们可以把北京、上海等热点城市分成不同的区域,并进行单独处理;
-
技术上出现热点时,要调整方案打散原来的 key,避免直接聚合;此外 Flink 还提供了大量的功能可以避免数据倾斜。
那么我们就从典型的场景入手,看看在 Flink 任务中出现数据倾斜的主要场景和解决方案。
Flink 任务数据倾斜场景和解决方案
两阶段聚合解决 KeyBy 热点
KeyBy 是我们经常使用的分组聚合函数之一。在实际的业务中经常会碰到这样的场景:双十一按照下单用户所在的省聚合求订单量最高的前 10 个省,或者按照用户的手机类型聚合求访问量最高的设备类型等。
上述场景在我们进行 KeyBy 时就会出现严重的数据倾斜,如下图所示:
如果我们直接简单地使用 KeyBy 算子,模拟一个简单的统计 PV 的场景如下:
DataStream sourceStream = ...;
windowedStream = sourceStream.keyBy("type")
.window(TumblingEventTimeWindows.of(Time.minutes(1)));
windowedStream.process(new MyPVFunction())
.addSink(new MySink())...
env.execute()...
我们在根据 type 进行 KeyBy 时,如果数据的 type 分布不均匀就会导致大量的数据分配到一个 task 中去,发生数据倾斜。
那么我们的解决思路是:
-
首先把分组的 key 打散,比如加随机后缀;
-
对打散后的数据进行聚合;
-
把打散的 key 还原为真正的 key;
-
二次 KeyBy 进行结果统计,然后输出。
DataStream sourceStream = ...;
resultStream = sourceStream
.map(record -> {
Record record = JSON.parseObject(record, Record.class);
String type = record.getType();
record.setType(type + "#" + new Random().nextInt(100));
return record;
})
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new CountAggregate())
.map(count -> {
String key = count.getKey.substring(0, count.getKey.indexOf("#"));
return RecordCount(key,count.getCount);
})
//二次聚合
.keyBy(0)
.process(new CountProcessFunction);
resultStream.sink()…
env.execute()…
其中 CountAggregate 函数实现如下:
public class CountAggregate implements AggregateFunction<Record,CountRecord,CountRecord> {
@Override
public CountRecord createAccumulator() {
return new CountRecord(null, 0L);
}
@Override
public CountRecord add(Record value, CountRecord accumulator) {
if(accumulator.getKey() == null){
accumulator.setKey(value.key);
}
accumulator.setCount(value.count);
return accumulator;
}
@Override
public CountRecord getResult(CountRecord accumulator) {
return accumulator;
}
@Override
public CountRecord merge(CountRecord a, CountRecord b) {
return new CountRecord(a.getKey(),a.getCount()+b.getCount()) ;
}
}
CountProcessFunction 的实现如下:
public class CountProcessFunction extends KeyedProcessFunction<String, CountRecord, CountRecord> {
private ValueState<Long> state = this.getRuntimeContext().getState(new ValueStateDescriptor("count",Long.class));
@Override
public void processElement(CountRecord value, Context ctx, Collector<CountRecord> out) throws Exception {
if(state.value()==0){
state.update(value.count);
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000L * 5);
}else{
state.update(state.value() + value.count);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<CountRecord> out) throws Exception {
//这里可以做业务操作,例如每 5 分钟将统计结果发送出去
//out.collect(...);
//清除状态
state.clear();
//其他操作
...
}
}
通过上面打散聚合再二次聚合的方式,我们就可以实现热点 Key 的打散,消除数据倾斜。
GroupBy + Aggregation 分组聚合热点问题
业务上通过 GroupBy 进行分组,然后紧跟一个 SUM、COUNT 等聚合操作是非常常见的。我们都知道 GroupBy 函数会根据 Key 进行分组,完全依赖 Key 的设计,如果 Key 出现热点,那么会导致巨大的 shuffle,相同 key 的数据会被发往同一个处理节点;如果某个 key 的数据量过大则会直接导致该节点成为计算瓶颈,引起反压。
我们还是按照上面的分组统计 PV 的场景,SQL 语句如下:
select
date,
type,
sum(count) as pv
from table
group by
date,
type;
我们可以通过内外两层聚合的方式将 SQL 改写为:
select date,
type,
sum(pv) as pv
from(
select
date,
type,
sum(count) as pv
from table
group by
date,
type,
floor(rand()*100) --随机打散成100份
)
group by
date,
type;
在上面的 SQL 拆成了内外两层,第一层通过随机打散 100 份的方式减少数据热点,当然这个打散的方式可以根据业务灵活指定。
Flink 消费 Kafka 上下游并行度不一致导致的数据倾斜
通常我们在使用 Flink 处理实时业务时,上游一般都是消息系统,Kafka 是使用最广泛的大数据消息系统。当使用 Flink 消费 Kafka 数据时,也会出现数据倾斜。
需要十分注意的是,我们 Flink 消费 Kafka 的数据时,是推荐上下游并行度保持一致,即 Kafka 的分区数等于 Flink Consumer 的并行度。
但是会有一种情况,为了加快数据的处理速度,来设置 Flink 消费者的并行度大于 Kafka 的分区数。如果你不做任何的设置则会导致部分 Flink Consumer 线程永远消费不到数据。
这时候你需要设置 Flink 的 Redistributing,也就是数据重分配。
Flink 提供了多达 8 种重分区策略,类图如下图所示:
在我们接收到 Kafka 消息后,可以通过自定义数据分区策略来实现数据的负载均衡,例如:
dataStream
.setParallelism(2)
// 采用REBALANCE分区策略重分区
.rebalance() //.rescale()
.print()
.setParallelism(4);
其中,Rebalance 分区策略,数据会以 round-robin 的方式对数据进行再次分区,可以全局负载均衡。
Rescale 分区策略基于上下游的并行度,会将数据以循环的方式输出到下游的每个实例中。
其他
Flink 一直在不断地迭代,不断出现各种各样的手段解决我们遇到的数据倾斜问题。例如,MiniBatch 微批处理手段等,需要我们开发者不断地去发现,并学习新的解决问题的办法。
总结
这一课时我们介绍了数据倾斜的原理和常见的解决方案,数据倾斜问题是大数据开发者遇到的最普遍也是最头疼的问题之一,如何高效地发现和解决数据倾斜问题是一个大数据从业者技术能力的直接体现。
点击这里下载本课程源码。
第17讲:生产环境中的并行度和资源设置
在使用 Flink 处理生产实际问题时,并行度和资源的配置调优是我们经常要面对的工作之一,如何有效和正确地配置并行度是我们的任务能够高效执行的必要条件。这一课时就来看一下生产环境的并行度和资源配置问题。
Flink 中的计算资源
通常我们说的 Flink 中的计算资源是指具体任务的 Task。首先要理解 Flink 中的计算资源的一些核心概念,比如 Slot、Chain、Task 等,正确理解这些概念有助于开发者了解 Flink 中的计算资源是如何进行隔离和管理的,也有助于我们快速地定位生产中的问题。
Task Slot
我们在第 03 课时“Flink 的编程模型与其他框架比较” 中提到过,在实际生产中,Flink 都是以集群在运行,在运行的过程中包含了两类进程,其中之一就是:TaskManager。
在 Flink 集群中,一个 TaskManger 就是一个 JVM 进程,并且会用独立的线程来执行 task,为了控制一个 TaskManger 能接受多少个 task,Flink 提出了 Task Slot 的概念。
我们可以简单地把 Task Slot 理解为 TaskManager 的计算资源子集。假如一个 TaskManager 拥有 5 个 Slot,那么该 TaskManager 的计算资源会被平均分为 5 份,不同的 task 在不同的 Slot 中执行,避免资源竞争。但需要注意的是,Slot 仅仅用来做内存的隔离,对 CPU 不起作用。那么运行在同一个 JVM 的 task 可以共享 TCP 连接,以减少网络传输,在一定程度上提高了程序的运行效率,降低了资源消耗。
Slot 共享
默认情况下,Flink 还允许同一个 Job 的子任务共享 Slot。因为在一个 Flink 任务中,有很多的算子,这些算子的计算压力各不相同,比如简单的 map 和 filter 算子所需要的资源不多,但是有些算子比如 window、group by 则需要更多的计算资源才能满足计算所需。这时候那些资源需求大的算子就可以共用其他的 Slot,提高整个集群的资源利用率。
Operator Chain
此外,Flink 自身会把不同的算子的 task 连接在一起组成一个新的 task。这么做是因为 Flink 本身提供了非常有效的任务优化手段,因为 task 是在同一个线程中执行,那么可以有效减少线程间上下文的切换,并且减少序列化/反序列化带来的资源消耗,从而在整体上提高我们任务的吞吐量。
并行度
Flink 使用并行度来定义某一个算子被切分成多少个子任务。我们的 Flink 代码会被转换成逻辑视图,在实际运行时,根据用户的并行度配置会被转换成对应的子任务进行执行。
源码解析
Flink Job 在执行中会通过 SlotProvider 向 ResourceManager 申请资源,RM 负责协调 TaskManager,满足 JobManager 的资源请求。
整体的类图如上图所示,SlotProvider 中的 allocateSlot 方法负责向 SlotPool 申请可用的 slot 资源,通过 returnLogicSlot 将空闲的 slot 释放至 SlotPool。
在整个 Flink 的资源管理的类中,核心的类包括 Scheduler、SlotPool、JobMaster。它们之间的交互流程主要包括:Scheduler 调度器会向 SlotPool 资源池申请和释放 Slot;如果 SlotPool 不能满足需求,那么会向 ResourceManager 发起申请;获取到的资源通过 JobMaster 分配给 SlotPool。
关于更多的资源管理的实现流程,可以参考上面的类图查看源码。
如何设置并行度
Flink 本身支持不同级别来设置我们任务并行度的方法,它们分别是:
- 算子级别
- 环境级别
- 客户端级别
- 集群配置级别
下面依次讲解 Flink 中的四种并行度的设置方法,以及它们的优先级。
算子级别
我们在编写 Flink 程序时,可以在代码中显示的制定不同算子的并行度。用经典的 wordcount 程序举例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = ...
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.setParallelism(10)
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(1);
wordCounts.print();
env.execute("word count");
如上述代码所示,我们可以通过显示的调用 setParallelism() 方法来显示的指定每个算子的并行度配置。
在实际生产中,我们推荐在算子级别显示指定各自的并行度,方便进行显示和精确的资源控制。
环境级别
环境级别的并行度设置指的是我们可以通过调用 env.setParallelism() 方法来设置整个任务的并行度:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
...
一旦设置了这个参数,表明我们任务中的所有算子的并行度都是指定的值,生产环境中并不推荐。
客户端级别
我们可以在使用命令提交 Flink Job 的时候指定并行度,当任务执行时发现代码中没有设置并行度,便会采用我们提交命令时的参数。
通过 -p 命令来指定任务提交时候的并行度:
./bin/flink run -p 5 ../wordCount-java*.jar
集群配置级别
我们的 flink-conf.yaml 文件中有一个参数 parallelism.default,该参数会在用户不设置任何其他的并行度配置时生效:
需要特别指出的是,设置并行度的优先级依次是:
算子级别 > 环境级别 > 客户端级别 > 集群配置级别
我们推荐在生产环境中使用算子级别的并行度进行资源控制。
总结
本课时我们讲解了 Flink 中和资源相关的几个重要概念,并且讲解了并行度设置的四种方法,我们在生产环境中的并行度设置是经过多次调优得出的。通过本课时的学习,你将会了解 Flink 中的并行度设置方法,并且能在生产环境中正确设置并行度。
点击这里下载本课程源码。
第18讲:如何进行生产环境作业监控
本课时主要讲解如何进行生产环境作业监控。
在第 15 课时“如何排查生产环境中的反压问题”中提到过我们应该如何发现任务是否出现反压,Flink 的后台页面是我们发现反压问题的第一选择,其后台页面可以直观、清晰地看到当前作业的运行状态。
在实际生产中,Flink 的后台页面可以方便我们对 Flink JobManager、TaskManager、执行计划、Slot 分配、是否反压等参数进行定位,对单个任务来讲可以方便地进行问题排查。
但是,对于很多大中型企业来讲,我们对进群的作业进行管理时,更多的是关心作业精细化实时运行状态。例如,实时吞吐量的同比环比、整个集群的任务运行概览、集群水位,或者监控利用 Flink 实现的 ETL 框架的运行情况等,这时候就需要设计专门的监控系统来监控集群的任务作业情况。
Flink Metrics
针对上面的情况,我们就用了 Flink 提供的另一个强大的功能:Flink Metrics。
Flink Metrics 是 Flink 实现的一套运行信息收集库,我们不但可以收集 Flink 本身提供的系统指标,比如 CPU、内存、线程使用情况、JVM 垃圾收集情况、网络和 IO 等,还可以通过继承和实现指定的类或者接口打点收集用户自定义的指标。
通过使用 Flink Metrics 我们可以轻松地做到:
-
实时采集 Flink 中的 Metrics 信息或者自定义用户需要的指标信息并进行展示;
-
通过 Flink 提供的 Rest API 收集这些信息,并且接入第三方系统进行展示。
Flink Metrics 分类
Flink 提供了四种类型的监控指标,分别是:Counter、Gauge、Histogram、Meter。
Counter
Counter 称为计数器,一般用来统计其中一个指标的总量,比如统计数据的输入、输出总量。
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("MyCounter");
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
Gauge
Gauge 被用来统计某一个指标的瞬时值。举个例子,我们在监控 Flink 中某一个节点的内存使用情况或者某个 map 算子的输出值数量。
public class MyMapper extends RichMapFunction<String, String> {
private transient int valueNumber = 0L;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Long>() {
@Override
public Long getValue() {
return valueNumber;
}
});
}
@Override
public String map(String value) throws Exception {
valueNumber++;
return value;
}
}
Meter
Meter 被用来计算一个指标的平均值。
public class MyMapper extends RichMapFunction<Long, Integer> {
private Meter meter;
@Override
public void open(Configuration config) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}
@public Integer map(Long value) throws Exception {
this.meter.markEvent();
}
}
Histogram
Histogram 是直方图,Flink 中属于直方图的指标非常少,它通常被用来计算指标的最大值、最小值、中位数等。
public class MyMapper extends RichMapFunction<Long, Integer> {
private Histogram histogram;
@Override
public void open(Configuration config) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@public Integer map(Long value) throws Exception {
this.histogram.update(value);
}
}
这里需要特别指出的是,Flink 中的 Metrics 是一个多层的结构,以 Group 的方式存在,我们用来定位唯一的一个 Metrics 是通过 Metric Group + Metric Name 的方式。
源码分析
Flink Metrics 相关的实现都是通过 org.apache.flink.metrics.Metric 这个类实现的,整体的类图如下所示:
为了方便对 Metrics 进行管理和分类,Flink 提供了对 Metrics 进行分组的功能,这个功能是通过下图中 MetricGroup 实现的,在图中可以看到 MetricGroup 相关的子类的继承关系。
此外,Flink 还提供了向外披露 Metric 的监测结果的接口,该接口是 org.apache.flink.metrics.reporter.MetricReporter。这个接口的实现类通过 Metrics 的类型进行注册和移除。
public abstract class AbstractReporter implements MetricReporter, CharacterFilter {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
protected final Map<Gauge<?>, String> gauges = new HashMap();
protected final Map<Counter, String> counters = new HashMap();
protected final Map<Histogram, String> histograms = new HashMap();
protected final Map<Meter, String> meters = new HashMap();
public AbstractReporter() {
}
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
String name = group.getMetricIdentifier(metricName, this);
synchronized(this) {
if(metric instanceof Counter) {
this.counters.put((Counter)metric, name);
} else if(metric instanceof Gauge) {
this.gauges.put((Gauge)metric, name);
} else if(metric instanceof Histogram) {
this.histograms.put((Histogram)metric, name);
} else if(metric instanceof Meter) {
this.meters.put((Meter)metric, name);
} else {
this.log.warn("Cannot add unknown metric type {}. This indicates that the reporter does not support this metric type.", metric.getClass().getName());
}
}
}
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
synchronized(this) {
if(metric instanceof Counter) {
this.counters.remove(metric);
} else if(metric instanceof Gauge) {
this.gauges.remove(metric);
} else if(metric instanceof Histogram) {
this.histograms.remove(metric);
} else if(metric instanceof Meter) {
this.meters.remove(metric);
} else {
this.log.warn("Cannot remove unknown metric type {}. This indicates that the reporter does not support this metric type.", metric.getClass().getName());
}
}
}
}
获取 Metrics
获取 Metrics 的方法有多种,首先我们可以通过 Flink 的后台管理页面看到部分指标;其次可以通过 Flink 提供的 Http 接口查询 Flink 任务的状态信息,因为 Flink Http 接口返回的都是 Json 信息,我们可以很方便地将 Json 进行解析;最后一种方法是,我们可以通过 Metric Reporter 获取。下面分别对这两者进行详细讲解。
Flink HTTP 接口
Flink 提供了丰富的接口来协助我们查询 Flink 中任务运行的状态,所有的请求都可以通过访问
http://hostname:8081/ 加指定的 URI 方式查询,Flink 支持的所有 HTTP 接口你都可以点击这里查询到。
Flink 支持的接口包括:
/config
/overview
/jobs
/joboverview/running
/joboverview/completed
/jobs/<jobid>
/jobs/<jobid>/vertices
/jobs/<jobid>/config
/jobs/<jobid>/exceptions
/jobs/<jobid>/accumulators
/jobs/<jobid>/vertices/<vertexid>
/jobs/<jobid>/vertices/<vertexid>/subtasktimes
/jobs/<jobid>/vertices/<vertexid>/taskmanagers
/jobs/<jobid>/vertices/<vertexid>/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
/jobs/<jobid>/plan
/jars/upload
/jars
/jars/:jarid
/jars/:jarid/plan
/jars/:jarid/run
举个例子,我们可以通过查询 /joboverview 访问集群中所有任务的概览,结果类似如下形式:
{
"running":[],
"finished":[
{
"jid": "7684be6004e4e955c2a558a9bc463f65",
"name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015",
"state": "FINISHED",
"start-time": 1442419702857,
"end-time": 1442419975312,
"duration":272455,
"last-modification": 1442419975312,
"tasks": {
"total": 6,
"pending": 0,
"running": 0,
"finished": 6,
"canceling": 0,
"canceled": 0,
"failed": 0
}
},
{
"jid": "49306f94d0920216b636e8dd503a6409",
"name": "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015",
...
}]
}
Flink Reporter
Flink 还提供了很多内置的 Reporter,这些 Reporter 在 Flink 的官网中可以查询到。
例如,Flink 提供了 Graphite、InfluxDB、Prometheus 等内置的 Reporter,我们可以方便地对这些外部系统进行集成。关于它们的详细配置也可以在 Flink 官网的详情页面中看到。
这里我们举一个 Flink 和 InfluxDB、Grafana 集成进行 Flink 集群任务监控的案例。在这个监控系统中,InfluxDB 扮演了 Flink 中监控数据存储者的角色,Grafana 则扮演了数据展示者的角色。
-
InfluxDB 的安装
InfluxDB 的安装过程很简单,我们不在这里赘述了,需要注意的事项是修改 InfluxDB 的配置 /etc/influxdb/influxdb.conf:
[admin]
enabled = true
bind-address = ":8083"
我们就可以通过 8083 端口打开 InfluxDB 的控制台了。
-
Grafana 的安装
安装可以直接点击这里参考官网说明,Grafana 的默认账号和密码分别是 admin、admin,可以通过 3000 端口进行访问。
-
修改 flink-conf.yaml
我们需要在 flink 的配置文件中新增以下配置:
metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: xxx.xxx.xxx.xxx
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
同时,将 flink-metrics-influxdb-1.10.0.jar 这个包复制到 flink 的 /lib 目录下,然后启动 Flink。
我们就可以在 Grafana 中看到 Metrics 信息了。
事实上,常用的 Flink 实时监控大盘包括但不限于:Prometheus+Grafana、Flink 日志接入 ELK等可以供用户选择。结合易用性、稳定性和接入成本,综合考虑,我们推荐实际监控中可以采用 Prometheus/InfluxDB+Grafana 相配合的方式。
总结
这一课时我们讲解了 Flink Metrics 指标的分类,并且从源码层面介绍了 Flink Metrics 的实现原理,最后还讲解了 Flink 监控这个 Metrics 的方式。我们在实际生产中应该灵活选取合适的监控方法,搭建自己的 Flink 任务监控大盘。
点击这里下载本课程源码