背景
在Kubernetes系统中,Kubelet通过参考Pod的QoS等级来管理单机容器的资源质量,例如OOM(Out of Memory)优先级控制等。Pod的QoS级别分为Guaranteed、Burstable和BestEffort。
- Guaranteed: Requests和Limits 资源相同,是最优先的,只有系统超过限制资源才会被杀死
- Burstable:Limits 大于 Requests ,优先级次于Guaranteed,在没有足够资源且没有BestEffort时被杀死
- BestEffort: 没有Limits 和 Requests 限制,在资源不足时最先被杀死
在生产环境中经常遇到如下情况,总资源量大于节点的实际使用量,节点的实际使用量在一定范围内波动,这就造成了总资源量中始终有一小部分资源未被使用,影响了资源的利用率。
根据Pod的QoS级别可以将任务提交为Burstable 和 BestEffort 以获得较高的利用率,在实际使用上BestEffort 约束不足,可能造成节点负载高但仍会调度BestEffort pod的问题。
Flink 支持资源超卖
Flink 在1.15.0 版本之前只支持CPU 的资源申请,没有内存的申请,FLINK-15648 支持了内存和CPU 的限制,同时也支持了Burstable 的配置。
资源配置增加了下几个参数
kubernetes.jobmanager.cpu.limit-factor
kubernetes.jobmanager.memory.limit-factor
kubernetes.taskmanager.cpu.limit-factor
kubernetes.taskmanager.memory.limit-factor
kubernetes.requests.jobmanager.mem
kubernetes.requests.taskmanager.mem
kubernetes.requests.jobmanager.cpu
kubernetes.requests.taskmanager.cpu
Requests 直接配置,Limits = Requests * limit-factor
源码的修改
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java 增加四个requests 的配置参数,而不是使用原有的默认参数
public static final ConfigOption<Double> JOB_MANAGER_REQUEST_CPU =
key("kubernetes.requests.jobmanager.cpu")
.doubleType()
.defaultValue(-1.0)
.withDescription("The number of cpu requested by job manager");
public static final ConfigOption<Double> TASK_MANAGER_REQUEST_CPU =
key("kubernetes.requests.taskmanager.cpu")
.doubleType()
.defaultValue(-1.0)
.withDescription("The number of cpu requested by task manager. By default, the cpu is set "
+ "to the number of slots per TaskManager");
public static final ConfigOption<MemorySize> JOB_MANAGER_REQUEST_MEM =
key("kubernetes.requests.jobmanager.mem")
.memoryType()
.defaultValue(new MemorySize(1024))
.withDescription("The number of cpu requested by job manager");
public static final ConfigOption<MemorySize> TASK_MANAGER_REQUEST_MEM =
key("kubernetes.requests.taskmanager.mem")
.memoryType()
.defaultValue(new MemorySize(1024))
.withDescription(
"The number of cpu requested by task manager. By default, the cpu is set "
+ "to the number of slots per TaskManager");
给新参数增加默认值
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
public double getJobManagerRequestCPU() {
if (flinkConfig.getDouble(KubernetesConfigOptions.JOB_MANAGER_REQUEST_CPU) < 0) {
return this.getJobManagerCPU();
}
return flinkConfig.getDouble(KubernetesConfigOptions.JOB_MANAGER_REQUEST_CPU);
}
public int getJobManagerRequestMemoryMB() {
if (flinkConfig.get(KubernetesConfigOptions.JOB_MANAGER_REQUEST_MEM).getMebiBytes() == 0) {
return this.getJobManagerMemoryMB();
}
return flinkConfig.get(KubernetesConfigOptions.JOB_MANAGER_REQUEST_MEM).getMebiBytes();
}
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
public int getTaskManagerRequestMemoryMB() {
if (flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_REQUEST_MEM).getMebiBytes() == 0) {
return this.getTaskManagerMemoryMB();
}
return flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_REQUEST_MEM).getMebiBytes();
}
public double getTaskManagerRequestCPU() {
if (flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_REQUEST_CPU) < 0) {
return this.getTaskManagerCPU();
}
return flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_REQUEST_CPU);
}
修改 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
使用 kubernetesJobManagerParameters.getJobManagerRequestMemoryMB() 等新参数替换老参数 kubernetesTaskManagerParameters.getTaskManagerMemoryMB()
使用注意事项
request 值可直接由kubernetes.requests.XXX.XX 设置,可随意调整大小,但有必须遵循一下关系
taskmanager/jobmanager.memory.process.size >= requests
taskmanager/jobmanager.memory.process.size <= requests * limit-factor
在这个区间内可随意控制
kubernetes.requests.jobmanager/taskmanager.mem 默认 1G
kubernetes.requests.jobmanager/taskmanager.cpu 默认使用jobmanager/taskmanager 自己的配置
实际使用
以session 启动为例,只需要在启动脚本中添加参数即可,下面是启动超卖的情况
kubernetes_cluster_id="flink-14-4-3-1-test" && \
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=${kubernetes_cluster_id} \
-Dkubernetes.taskmanager.cpu=1 \
-Dkubernetes.container.image=xxx.cn/1.14.4.3.1-scala_2.11 \
-Dkubernetes.service-account=flink \
-Denv.java.opts="-Dlog4j2.formatMsgNoLookups=true" \
-Dtaskmanager.slot.timeout="30 s" \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.jobmanager.cpu.limit-factor=2 \
-Dkubernetes.jobmanager.memory.limit-factor=2 \
-Dkubernetes.taskmanager.cpu.limit-factor=2 \
-Dkubernetes.taskmanager.memory.limit-factor=2 \
-Dkubernetes.requests.jobmanager.mem=1500m \
-Dkubernetes.requests.taskmanager.mem=1500m \
-Djobmanager.memory.process.size=2800m \
-Dtaskmanager.memory.process.size=1728m \
-Drest.flamegraph.enabled=true
上面参数kubernetes.requests.jobmanager.mem 和 kubernetes.requests.jobmanager.mem 都是1500M ,也就是说requests 参数为1500M
kubernetes.taskmanager.memory.limit-factor=2 且 kubernetes.jobmanager.memory.limit-factor=2 则是说明 Limit 值 为requests*2 ,也就是3000M
上面的jobmanager 和 taskmanager 实际是启用了超用的, 1500m(requests) < 2800m (jobmanager 总内存占用) < 3000m (Limit) ,1500m(requests) < 1728m (taskmanager 总内存占用) < 3000m (Limit) ,极端环境可以使 =jobmanager 总内存占用=taskmanager=Limit ,占用较多的资源。
若是不想超用则 requests=jobmanager 总内存占用=taskmanager 总内存占用 即可,这里要注意limit-factor 的设置 ,集群资源少可以减少倍率降低对总资源的占用,集群资源多可增大该值以提高空闲资源的占用。
不启动超卖的情况
kubernetes_cluster_id="flink-14-4-3-1-test" && \
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=${kubernetes_cluster_id} \
-Dkubernetes.taskmanager.cpu=1 \
-Dkubernetes.container.image=xxx.cn/1.14.4.3.1-scala_2.11 \
-Dkubernetes.service-account=flink \
-Denv.java.opts="-Dlog4j2.formatMsgNoLookups=true" \
-Dtaskmanager.slot.timeout="30 s" \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.jobmanager.cpu.limit-factor=1 \
-Dkubernetes.jobmanager.memory.limit-factor=1 \
-Dkubernetes.taskmanager.cpu.limit-factor=1 \
-Dkubernetes.taskmanager.memory.limit-factor=1 \
-Dkubernetes.requests.jobmanager.mem=1500m \
-Dkubernetes.requests.taskmanager.mem=1500m \
-Djobmanager.memory.process.size=1500m \
-Dtaskmanager.memory.process.size=1500m \
-Drest.flamegraph.enabled=true
limit-factor 的值必须都大于1 ,这样 requests=jobmanager 总内存占用=taskmanager 总内存占用 = Limit 也可以这样设置,但是没多余的弹性空间
使用场景
在测试与开发环境中,我们希望能够使用尽可能少的资源,同时获得最大的性能和效益。在这种情况下,我们可以使用Flink资源超卖策略,提高测试和开发效率,同时减少系统资源占用。
例如:
测试环境只剩1G 的内存空间,但任务需要2G 的空间来测试,而实际的使用达不到2G时可以使用该功能优化,具体配置参数参考参考以下配置
-Dkubernetes.taskmanager.memory.limit-factor=2
-Dkubernetes.requests.taskmanager.mem=1g
-Dtaskmanager.memory.process.size=1g
此配置可能会导致OOM ,生产等长时间的任务可适当减少limit-factor 这个因子,提高稳定性。
临时高CPU 占用但其他任务滴CPU使用率的任务,那么我们可以调高cpu 的实际占用率,来达到多占用CPU的目的,对未超卖的任务没有影响。
总结
当kubernetes集群中某个节点上可用资源比较小时,kubernetes提供了资源回收策略来保证节点上POD正常运行,资源超卖的任务pod 会被停止。重要的任务不要配置超卖。普通任务按重要程度CPU分为2,5,10三个超卖标准,10倍超卖适合访问量不高。(也就是超卖比越高,说明服务优先级越低),内存一般使用固定的1.5倍超卖标准(可依据实际的使用量调整)。