背景
线上Flink 1.10版本集群运行了近半年后(standalone方式部署在k8s上),发现经常oom,导致k8s pod经常性超过内存限制而重启。
经过一段时间分析发现了相关问题决定升级到1.12。在此记录下事件过程中的点点滴滴
问题现象
集群运行一段时间后POD重启次数不断增加,多者可以达到几十次
分析问题过程
理论上如果没有内存泄漏,按照flink 官方说明的参数配置java进程的内存应该是限制在一个范围内的。就算偶尔数量大可能造成内存不足但也不至于如此频繁的oom(差不多一个任务起来后两三天必出现oom)。
-
第一步怀疑自己参数配置错误:
查看了程序启动参数,分析参数设置是否有问题,是否按照taskmanager的内存模型分配的内存。分析flink各部分内存占用情况(我们一个pod有8G分4个slot,processSize:7200M).中间还特意去看了下flink 1.10后内存是如何分配的(
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html) 。内存使用情况如下:
jcmd 看相关的内存占用也比较正常
题外话:其实这个内存已经有个excel工具可以计算,只不过我开始没发现(flink社区的文档还是相当的强大的,很多问题可以从邮件组、jira中发现有别人提出过类似的问题)。
经过比对,确认配置上、堆内存并没有什么问题。只能继续考虑其他的了——native内存。首先进入我们考虑之列的是rocksdb。
网上也有rocksdb占用native大的说法,而且native内存也不受管控很容易出现内存使用过多而不知道。于是我们增加了rocksdb的相关metrics。观察指标并没有异样。其实对rocksdb的内存,观察指标会发现几个对应的blockcache指标在不同机器,不同任务都基本上一样的。这主要是目前flink 在多个slot间共享的cache以及write buffer manager。基本上是state.backend.rocksdb.memory.managed的2/3
详细信息可以看:https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html#incremental-checkpoints
-
排除了我们最怀疑的点rocksdb,只能分析其他的原因了。刚好网上找到一片文章介绍k8s oom的 详解 Flink 容器化环境下的 OOM Killed 。看完之后感觉现象和我们的有点像,之前也经过pmap命令分析过,有很多64M左右的连续内存块分配。于是打算按照这个原因测试验证一下, 在测试环境验证,加了环境变量MALLOC_ARENA_MAX 发现连续64M的内存分配消失了。这样总算是定位到具体问题。
关于glibc 分配内存造成泄漏的问题,官方issues也有相关的issue讨论,而且在1.12版本对这些内存泄漏情况做了不少优化:
1、childfirstclassloader 增加了泄漏检查
2、去掉了glibc的分配方式改用jemalloc最终我们的临时方案是:设置MALLOC_ARENA_MAX=2,减少问题出现的频率。
修改重启集群之后确实pod重启的次数少了很多,差不多稳定运行了两三天。又发现有重启问题了,不过这次主要是metaspace oom导致程序无法于行,pod健康检查失败重启了。dump出来taskmanager的内存分析后发现主要是否我们有个批量的定时任务会隔半个小时启动一次,由于flink 采用的是childFirstClassloader方式加载,同时我们底层使用cat做相关监控的时候由于thread context classloader 问题,导致任务关闭后classloader任然无法释放没法垃圾回收掉。
现象就是jmap dump出来的内存分析中有很多重复被加载的类。久而久之不断的积累,metaspace就达到了我们的上限400M。这可真是一波刚平一波又起,解决一个问题冒出来另外一个,问题总比想象的多。
经过再三考虑权衡,我们打算升级到flink 1.12版本,同时使用flink on 8s native的方式,这样我们可以减少JOB互相影响(定时任务重启后自动销毁pod)。看了下官方文档,本以为很简单,但过程永远没想想的那么简单。
升级过程
- 查看flink 1.11/1.12的release notes,看看有哪些点修改了,对我们的任务是否有影响
- 查看1.11,1.12官方建议升级步骤
按照文档一步一步来,本来以为万事大吉。但也还是出现了些坑。-
kafka connector 改了,我们1.10代码自定义一个反序列化的wrapper,继承自KafkaDeserializationSchemaWrapper 。原先是重写了
public T deserialize(ConsumerRecord<byte[], byte[]> record) 方法,可是新的代码中不是这样的,kafkaFetcher直接通过public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) 方法反序列化,收集数据。这个问题导致我们测试的时候莫名其妙,奇怪于为什么数据没有按照我们写的代码处理。(这个问题可能对直接使用1.12的人没啥,但对升级过来的就会觉得很奇怪了,等于是原来调用A方法,现在变成了调用B方法了)protected void partitionConsumerRecordsHandler( List<ConsumerRecord<byte[], byte[]>> partitionRecords, KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception { for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { deserializer.deserialize(record, kafkaCollector); // emit the actual records. this also updates offset state atomically and emits // watermarks emitRecordsWithTimestamps( kafkaCollector.getRecords(), partition, record.offset(), record.timestamp()); if (kafkaCollector.isEndOfStreamSignalled()) { // end of stream signaled running = false; break; } } }
cep api修改。在原来的版本中默认的是数据处理是processing Time,但是1.12开始默认的是event Time,由于我们的cep代码在原来的基础上做了些定制,只好看看关联的定制代码是否变更,然后按照新的方式使用PatternStram.inProcessingTime()的方式设置processing Time
-
Alink结合问题。我们原先用的是session clsuter,alink相关的任务都打包到fat jar中,不会有关联的依赖。Alink和flink1.12结合后按照官方的文档需要设置classloader.resolve-order: parent-first 的方式将父类加载器优先,而着和flink中默认的child-first是相背的。开始的时候我们以为只要你过打个fat jar就能正常跑起来了,可一跑,发现反序列化的时候找不到相应的类。
从错误信息看: 主要是通过appClassloader加载类com.alibaba.alink.common.linalg.DenseVector 无法找到。这个类在fat jar里面,只能通过childFirstClassLoader加载到,appClassLoader自然是无法找到了,看样子只能把这个类放到镜像里面了。于是想着把alink-core放到lib目录,试了一把还是有问题,发现缺少其他的一些依赖内容。实在没办法了只能去github看看alink官方的部署文档,官方是通过pyalink进行部署的,说是里面的jar和java 的是通用的,于是本地通过pyton部署了一把,获得相应的jar。发现这个jar有50M左右,而我本地的alink-core只有6M,看来是本地maven下载的不是fat jar,缺少相应的以来jar包。只好用这个50多M的包替换,然后我不想用parent-first 的方式进行类加载,毕竟我的集群还要运行其他任务。只能折中下吧alink-core里面相关的目录放到classloader.parent-first-patterns.additional配置里面。这次算是终于大功告成了。
- Alink job 启动报错如下:Cannot have more than one execute() or executeAsync() call in a single environment:1、Cannot have more than one execute() or executeAsync() call in a single environment
这是错误的原因是从1.11开始,通过rest提交任务后会检查在一个env中提交任务执行次数(也就是execute和executeAsync执行次数)(FLINK-16657), alink中的env是通过MLEnvironmentFacory获得的,每次获得后放到static类型的map中。我们是在session cluster运行,任务第一次提交后得到的是default env,第二次提交还是同一个env,这样就会报错了(MLEnvironmentFacory 是通过parentClassLoader加载的,自然在多次任务中一直存在不会消失) 后面解决方式是通过反射在任务启动的时候将map中内容清空
-
- 机器配置修改。原来我们的集群是4C8G,每个taskmanager4个slot,为了避免任务间互相影响,我们改为1C2G的taskmanager,每个tm 1个slot。运行1天后发现2G的内存在我们任务中确实有点少,容易heap space 不足导致oom。只好改为3G。 内存分配的参数优化这块只能根据不同的任务使用场景自行去考虑了。
- savepoint迁移。原先我们使用nas挂载的方式持久化保存状态数据的,但是因为k8s nantive 暂时还不支持增加Persistent Volume 配置,所以没法设置挂载对应的nas。我们只得想其他办法实现。最后综合考虑公司现有资源,使用阿里云的oss当作持久化状态保存的分布式系统。新任务顺利的启动了,但是最后有状态的而且状态很重要的任务因为savepoint的_metadata中包含了绝对路径无法迁移。官网寻找答案然后网上找寻了一圈资料,发现有其他人也有类似的问题,最后依靠下面的文章寻到了解决方案:
Move Flink Savepoint to a different S3 location。
核心思想是读取savepoint替换掉_metadata中的绝对路径,如果想迁移checkpoint的话也是类似的做法,可以github下载作者代码做些修改即可
总结
经过一段时间的发现问题,解决问题之路。最终1.12算是平稳迁移成功了。在此对想升级使用flink的提些建议:
- flink官方文档还是挺全的,建议是不是的可以看看开发问题,jira中的issues。如果有问题也可以在用户邮件组和这些地方搜索查找
- 可以把flink最新分之下载下来,如果发现一些异常bug根据关键字在其中搜索,看看前后的源码,分析定位原因
- 最后一点就是多做验证测试。问题可能就出现在你不经意的地方。