0
点赞
收藏
分享

微信扫一扫

加密流量分类torch实践3:TrafficClassificationPandemonium项目分享

静鸡鸡的JC 03-05 07:00 阅读 3

在这里插入图片描述

背景介绍

在实际生产环境中,当请求激增时,kafka 生产者发送的消息数量会远远大于 kafka 消费者的消费能力,从而导致消息堆积和处理延迟。为了避免此种情况,就要求消费者能够感知到 kafka 消息堆积,并通过动态增加或减少自身的副本数,实现动态自适应消费,这就是本文即将介绍的内容,即基于 kafka_consumergroup_lag 指标实现 Consumer Pod 水平弹性伸缩。

Kubernetes 通过 HPA 实现 Pod 的水平弹性伸缩,默认支持多种类型,包括 Resource、Pods、Object、External、ContainerResource。有关 HPA 的更多官方介绍请参考官方文档:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/,本文不作冗余说明。

由于 kafka_consumergroup_lag 指标不是从待扩缩容的消费者 Pod 上采集上来的,没有与 K8s 资源对象建立关联关系,因此这里需要使用 External 类型的 HPA。

整体架构图如下:
在这里插入图片描述

实验步骤

1. 准备 Kubernetes 集群

本文使用 Kind 创建一个测试集群, 准备如下配置文件,命名为 kind-cluster.yaml:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
  image: kindest/node:v1.28.0@sha256:b7a4cad12c197af3ba43202d3efe03246b3f0793f162afb40a33c923952d5b31
- role: worker
  image: kindest/node:v1.28.0@sha256:b7a4cad12c197af3ba43202d3efe03246b3f0793f162afb40a33c923952d5b31
- role: worker
  image: kindest/node:v1.28.0@sha256:b7a4cad12c197af3ba43202d3efe03246b3f0793f162afb40a33c923952d5b31

执行如下命令,创建 Kind 集群:

kind create cluster --config kind-cluster.yaml

2. 安装部署 Kafka 组件

这里使用 Helm 一键安装:

helm repo add kafka-repo https://helm-charts.itboon.top/kafka
helm repo update kafka-repo
helm upgrade --install kafka \
  --namespace kafka \
  --create-namespace \
  --set broker.combinedMode.enabled="true" \
  --set broker.persistence.enabled="false" \
  kafka-repo/kafka

注意,此种安装关闭了持久化存储,单实例最小化运行,仅用于测试环境。

3. 部署 Prometheus 监控组件

git clone https://github.com/prometheus-operator/kube-prometheus.git
cd kube-prometheus
kubectl create -f manifests/setup
kubectl create -f manifests/

通过端口转发,访问 Prometheus 看板:

kubectl port-forward service/prometheus-k8s 9090:9090 -n monitoring

执行上述端口转发命令后,浏览器访问 http://localhost:9090
在这里插入图片描述

4. 部署 Kafka Exporter 指标采集器

准备如下 kafka-exporter.yaml 文件:

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-exporter
  namespace: monitoring
  labels:
    app: kafka-exporter
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-exporter
  template:
    metadata:
      labels:
        app: kafka-exporter
    spec:
      containers:
      - name: kafka-exporter
        image: danielqsj/kafka-exporter:v1.6.0
        imagePullPolicy: IfNotPresent
        args: ["--kafka.server=kafka-headless.kafka:9092"]
        ports:
        - containerPort: 9308
          name: http
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-exporter
  name: kafka-exporter
  namespace: monitoring
spec:
  type: ClusterIP
  ports:
  - name: http
    port: 9308
    protocol: TCP
    targetPort: 9308
  selector:
    app: kafka-exporter

执行如下命令安装:

kubectl apply -f kafka-exporter.yaml 

5. 配置 Prometheus 采集 Kafka Exporter 的数据

本文通过创建 ServiceMonitor 实现,准备如下 kafka-service-monitor.yaml 文件:

---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  labels:
    app: kafka-exporter
  name: prometheus-kafka-exporter
  namespace: monitoring
spec:
  endpoints:
    - honorLabels: true
      interval: 1m
      path: /metrics
      port: http
      scheme: http
      params:
        target:
          - 'kafka-headless.kafka:9092'
      relabelings:
        - sourceLabels: [__param_target]
          targetLabel: instance
  namespaceSelector:
    matchNames:
      - monitoring
  selector:
    matchLabels:
      app: kafka-exporter

执行如下命令,创建 ServiceMonitor:

kubectl apply -f kafka-service-monitor.yaml 

6. 创建测试 Topic

通过 kubectl exec 进入 kafka 容器,执行 bin/kafka-topics.sh 命令创建 Topic:

kubectl exec -it -n kafka kafka-broker-0 bash

bin/kafka-topics.sh --bootstrap-server kafka-headless.kafka:9092 \
   --create --topic custom-topic \
   --replication-factor 1 \
   --partitions 3

7. 部署 kafka 消费者

准备如下 kafka-consumer.yaml 文件:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-kafka-go-client
spec:
  replicas: 1
  selector:
    matchLabels:
      lang: golang
      kafka: consumer
  template:
    metadata:
      labels:
        lang: golang
        kafka: consumer
    spec:
      containers:
      - name: consumer-kafka-go-client
        image: shidaqiu/kafka-client:1.1
        command:
        - ./consumer
        - kafka-headless.kafka:9092
        - custom-topic
        - golang-consumer
        - "100"
        # WaitMs
        - "2000"
        - plaintext
        resources:
          limits:
            cpu: 50m
            memory: 300Mi

执行如下命令创建:

kubectl apply -f kafka-consumer.yaml 

8. 部署 kafka 生产者

准备如下 kafka-producer.yaml 文件:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer-kafka-go-client
spec:
  replicas: 1
  selector:
    matchLabels:
      lang: golang
      kafka: producer
  template:
    metadata:
      labels:
        lang: golang
        kafka: producer
    spec:
      containers:
      - name: producer-kafka-go-client
        image: shidaqiu/kafka-client:1.1
        command:
        - ./producer
        - kafka-headless.kafka:9092
        - custom-topic
        - "100"
        - "10000"
        - none
        - "1000"
        # WaitMs
        - "2000"
        - plaintext

执行如下命令创建:

kubectl apply -f kafka-producer.yaml 

9. 验证 Prometheus 采集到了相关指标

登陆 Prometheus 看板,检查 kafka_consumergroup_lag 指标被成功采集

在这里插入图片描述

10. 部署 Prometheus Adaptor

首先,下载 helm chart 包到本地

helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update
helm pull --untar prometheus-community/prometheus-adapter

然后,编辑其 values.yaml 文件,修改参数,包括设置 Prometheus URL 地址,以及增加 kafka_consumergroup_lag 的指标转换规则,示例如下:

prometheus:
  # Value is templated
  url: http://prometheus-k8s.monitoring
  port: 9090
  path: ""

rules:
  external: 
  - seriesQuery: '{topic!="", __name__=~"kafka_consumergroup_lag"}'
    resources:
      template: <<.Resource>>
    name:
      as: "hpa_kafka_consumergroup_lag"
    metricsQuery: sum(min_over_time(kafka_consumergroup_lag{<< range $key, $value :=.LabelValuesByName >><< if ne $key "namespace" >><< $key >>="<< $value >>",<<end >><< end >>}[1h])) by (topic,consumergroup)

最后,使用新配置,部署 Prometheus Adaptor:

helm upgrade --install prometheus-adaptor . -n monitoring

部署完成后,可以通过如下命令验证指标采集和转换无误:

kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/monitoring/hpa_kafka_consumergroup_lag" | jq .

输入类似如下内容,则表示工作正常:

{
  "kind": "ExternalMetricValueList",
  "apiVersion": "external.metrics.k8s.io/v1beta1",
  "metadata": {},
  "items": [
    {
      "metricName": "hpa_kafka_consumergroup_lag",
      "metricLabels": {
        "consumergroup": "golang-consumer",
        "topic": "custom-topic"
      },
      "timestamp": "2024-03-02T14:26:15Z",
      "value": "0"
    }
  ]
}

11. 配置 HPA 规则

准备 consumer-hpa.yaml 文件:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: consumer-kafka-go-client-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: consumer-kafka-go-client
  minReplicas: 1
  maxReplicas: 3
  metrics:
  - type: External
    external:
      metric: 
        name: hpa_kafka_consumergroup_lag
        selector:
          matchLabels:
            topic: custom-topic
      target:
        type: Value
        value: "300"

执行如下命令,应用 HPA:

kubectl apply -f consumer-hpa.yaml

12. 施加压力,观察 Pod 扩容

通过如下命令扩容 producer 实例数量

kubectl scale deploy/producer-kafka-go-client --replicas 2

可以观察到当 hpa_kafka_consumergroup_lag 超过 300 时,能够触发消费者实例扩容:

在这里插入图片描述

本文相关源码:https://github.com/SataQiu/kafka_metrics_hpa

引用文献:
  • https://medium.com/@roman.noze/kubernetes-pods-autoscaling-with-kafka-metrics-9b7d5ec3c1d3
举报

相关推荐

0 条评论