0
点赞
收藏
分享

微信扫一扫

Flink on k8s 增加资源超卖

dsysama 2023-10-19 阅读 40

背景

在Kubernetes系统中,Kubelet通过参考Pod的QoS等级来管理单机容器的资源质量,例如OOM(Out of Memory)优先级控制等。Pod的QoS级别分为Guaranteed、Burstable和BestEffort。

Flink on k8s 增加资源超卖_java

  • Guaranteed: Requests和Limits 资源相同,是最优先的,只有系统超过限制资源才会被杀死
  • Burstable:Limits 大于 Requests ,优先级次于Guaranteed,在没有足够资源且没有BestEffort时被杀死
  • BestEffort: 没有Limits 和 Requests 限制,在资源不足时最先被杀死

在生产环境中经常遇到如下情况,总资源量大于节点的实际使用量,节点的实际使用量在一定范围内波动,这就造成了总资源量中始终有一小部分资源未被使用,影响了资源的利用率。

Flink on k8s 增加资源超卖_flink_02

根据Pod的QoS级别可以将任务提交为Burstable 和 BestEffort 以获得较高的利用率,在实际使用上BestEffort 约束不足,可能造成节点负载高但仍会调度BestEffort pod的问题。


Flink 支持资源超卖

Flink 在1.15.0 版本之前只支持CPU 的资源申请,没有内存的申请,FLINK-15648 支持了内存和CPU 的限制,同时也支持了Burstable 的配置。


资源配置增加了下几个参数

kubernetes.jobmanager.cpu.limit-factor
kubernetes.jobmanager.memory.limit-factor
kubernetes.taskmanager.cpu.limit-factor
kubernetes.taskmanager.memory.limit-factor

kubernetes.requests.jobmanager.mem
kubernetes.requests.taskmanager.mem
kubernetes.requests.jobmanager.cpu
kubernetes.requests.taskmanager.cpu

Requests 直接配置,Limits = Requests * limit-factor


源码的修改

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java 增加四个requests 的配置参数,而不是使用原有的默认参数

public static final ConfigOption<Double> JOB_MANAGER_REQUEST_CPU =
            key("kubernetes.requests.jobmanager.cpu")
                    .doubleType()
                    .defaultValue(-1.0)
                    .withDescription("The number of cpu requested by job manager");

    public static final ConfigOption<Double> TASK_MANAGER_REQUEST_CPU =
            key("kubernetes.requests.taskmanager.cpu")
                    .doubleType()
                    .defaultValue(-1.0)
                    .withDescription("The number of cpu requested by task manager. By default, the cpu is set "
                            + "to the number of slots per TaskManager");

    public static final ConfigOption<MemorySize> JOB_MANAGER_REQUEST_MEM =
            key("kubernetes.requests.jobmanager.mem")
                    .memoryType()
                    .defaultValue(new MemorySize(1024))
                    .withDescription("The number of cpu requested by job manager");

    public static final ConfigOption<MemorySize> TASK_MANAGER_REQUEST_MEM =
            key("kubernetes.requests.taskmanager.mem")
                    .memoryType()
                    .defaultValue(new MemorySize(1024))
                    .withDescription(
                            "The number of cpu requested by task manager. By default, the cpu is set "
                                    + "to the number of slots per TaskManager");

给新参数增加默认值

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java

public double getJobManagerRequestCPU() {
        if (flinkConfig.getDouble(KubernetesConfigOptions.JOB_MANAGER_REQUEST_CPU) < 0) {
            return this.getJobManagerCPU();
        }
        return flinkConfig.getDouble(KubernetesConfigOptions.JOB_MANAGER_REQUEST_CPU);
    }

    public int getJobManagerRequestMemoryMB() {
        if (flinkConfig.get(KubernetesConfigOptions.JOB_MANAGER_REQUEST_MEM).getMebiBytes() == 0) {
            return this.getJobManagerMemoryMB();
        }
        return flinkConfig.get(KubernetesConfigOptions.JOB_MANAGER_REQUEST_MEM).getMebiBytes();
    }

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java

public int getTaskManagerRequestMemoryMB() {
        if (flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_REQUEST_MEM).getMebiBytes() == 0) {
            return this.getTaskManagerMemoryMB();
        }
        return flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_REQUEST_MEM).getMebiBytes();
    }

    public double getTaskManagerRequestCPU() {
        if (flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_REQUEST_CPU) < 0) {
            return this.getTaskManagerCPU();
        }
        return flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_REQUEST_CPU);
    }

修改 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java

使用 kubernetesJobManagerParameters.getJobManagerRequestMemoryMB() 等新参数替换老参数 kubernetesTaskManagerParameters.getTaskManagerMemoryMB()

使用注意事项

request 值可直接由kubernetes.requests.XXX.XX 设置,可随意调整大小,但有必须遵循一下关系

taskmanager/jobmanager.memory.process.size  >= requests 

taskmanager/jobmanager.memory.process.size <= requests * limit-factor

在这个区间内可随意控制 

kubernetes.requests.jobmanager/taskmanager.mem 默认 1G 
 kubernetes.requests.jobmanager/taskmanager.cpu 默认使用jobmanager/taskmanager 自己的配置

实际使用

以session 启动为例,只需要在启动脚本中添加参数即可,下面是启动超卖的情况

kubernetes_cluster_id="flink-14-4-3-1-test" && \
./bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=${kubernetes_cluster_id} \
  -Dkubernetes.taskmanager.cpu=1 \
  -Dkubernetes.container.image=xxx.cn/1.14.4.3.1-scala_2.11 \
  -Dkubernetes.service-account=flink \
  -Denv.java.opts="-Dlog4j2.formatMsgNoLookups=true" \
  -Dtaskmanager.slot.timeout="30 s" \
  -Dkubernetes.rest-service.exposed.type=NodePort \
  -Dkubernetes.jobmanager.cpu.limit-factor=2 \
  -Dkubernetes.jobmanager.memory.limit-factor=2 \
  -Dkubernetes.taskmanager.cpu.limit-factor=2 \
  -Dkubernetes.taskmanager.memory.limit-factor=2 \
  -Dkubernetes.requests.jobmanager.mem=1500m \
  -Dkubernetes.requests.taskmanager.mem=1500m \
  -Djobmanager.memory.process.size=2800m \
  -Dtaskmanager.memory.process.size=1728m \
  -Drest.flamegraph.enabled=true

上面参数kubernetes.requests.jobmanager.mem 和 kubernetes.requests.jobmanager.mem  都是1500M  ,也就是说requests 参数为1500M 

kubernetes.taskmanager.memory.limit-factor=2 且 kubernetes.jobmanager.memory.limit-factor=2 则是说明 Limit 值 为requests*2 ,也就是3000M 

上面的jobmanager 和 taskmanager 实际是启用了超用的,  1500m(requests) <  2800m (jobmanager 总内存占用) < 3000m (Limit)  ,1500m(requests) <  1728m (taskmanager 总内存占用) < 3000m (Limit)  ,极端环境可以使  =jobmanager 总内存占用=taskmanager=Limit ,占用较多的资源。

若是不想超用则 requests=jobmanager 总内存占用=taskmanager 总内存占用 即可,这里要注意limit-factor 的设置 ,集群资源少可以减少倍率降低对总资源的占用,集群资源多可增大该值以提高空闲资源的占用。

不启动超卖的情况

kubernetes_cluster_id="flink-14-4-3-1-test" && \
./bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=${kubernetes_cluster_id} \
  -Dkubernetes.taskmanager.cpu=1 \
  -Dkubernetes.container.image=xxx.cn/1.14.4.3.1-scala_2.11 \
  -Dkubernetes.service-account=flink \
  -Denv.java.opts="-Dlog4j2.formatMsgNoLookups=true" \
  -Dtaskmanager.slot.timeout="30 s" \
  -Dkubernetes.rest-service.exposed.type=NodePort \
  -Dkubernetes.jobmanager.cpu.limit-factor=1 \
  -Dkubernetes.jobmanager.memory.limit-factor=1 \
  -Dkubernetes.taskmanager.cpu.limit-factor=1 \
  -Dkubernetes.taskmanager.memory.limit-factor=1  \
  -Dkubernetes.requests.jobmanager.mem=1500m \
  -Dkubernetes.requests.taskmanager.mem=1500m \
  -Djobmanager.memory.process.size=1500m \
  -Dtaskmanager.memory.process.size=1500m \
  -Drest.flamegraph.enabled=true

limit-factor 的值必须都大于1 ,这样  requests=jobmanager 总内存占用=taskmanager 总内存占用 = Limit 也可以这样设置,但是没多余的弹性空间

使用场景

在测试与开发环境中,我们希望能够使用尽可能少的资源,同时获得最大的性能和效益。在这种情况下,我们可以使用Flink资源超卖策略,提高测试和开发效率,同时减少系统资源占用。

例如:

测试环境只剩1G 的内存空间,但任务需要2G 的空间来测试,而实际的使用达不到2G时可以使用该功能优化,具体配置参数参考参考以下配置

-Dkubernetes.taskmanager.memory.limit-factor=2

-Dkubernetes.requests.taskmanager.mem=1g

-Dtaskmanager.memory.process.size=1g

此配置可能会导致OOM ,生产等长时间的任务可适当减少limit-factor 这个因子,提高稳定性。

临时高CPU 占用但其他任务滴CPU使用率的任务,那么我们可以调高cpu 的实际占用率,来达到多占用CPU的目的,对未超卖的任务没有影响。

总结

当kubernetes集群中某个节点上可用资源比较小时,kubernetes提供了资源回收策略来保证节点上POD正常运行,资源超卖的任务pod 会被停止。重要的任务不要配置超卖。普通任务按重要程度CPU分为2,5,10三个超卖标准,10倍超卖适合访问量不高。(也就是超卖比越高,说明服务优先级越低),内存一般使用固定的1.5倍超卖标准(可依据实际的使用量调整)。


举报

相关推荐

0 条评论