0
点赞
收藏
分享

微信扫一扫

使用k8s daemonset控制器部署fluentd收集日志发送至kafka/s3/logstash

萧萧雨潇潇 2023-05-20 阅读 25

daemonset文件

cat > fluentbit-daemonset.yml <<eof
apiVersion: apps/v1
kind: DaemonSet
metadata:
    name: fluentbit
    labels:
        k8s-app: fluentbit
        version: v8
        kubernetes.io/cluster-service: "true"
spec:
    selector:
        matchLabels:
          k8s-app: fluentbit
          version: v1
    updateStrategy:
        type: RollingUpdate
    template:
        metadata:
            labels:
                k8s-app: fluentbit
                version: v1
                kubernetes.io/cluster-service: "true"
        spec:
            containers:
              - name: fluentbit
                image: public.ecr.aws/aws-observability/aws-for-fluent-bit:latest
                imagePullPolicy: Always
                command: ["/fluent-bit/bin/fluent-bit","-c", "/fluent-bit/etc/fluentbit.conf"]
                env:
                - name: NODE_NAME
                  valueFrom:
                    fieldRef:
                        fieldPath: spec.nodeName
                - name: MY_POD_NAME
                  valueFrom:
                    fieldRef:
                        fieldPath: metadata.name
                - name: MY_POD_NAMESPACE
                  valueFrom:
                    fieldRef:
                        fieldPath: metadata.namespace
                - name: MY_POD_IP
                  valueFrom:
                    fieldRef:
                        fieldPath: status.podIP
                resources:
                    requests:
                        cpu: 5m
                        memory: 20Mi
                    limits:
                        cpu: 60m
                        memory: 60Mi
                volumeMounts:
                - name: varlog
                  mountPath: /var/log
                - name: logs
                  mountPath: /data/logic/log/log
                  readOnly: true
                - name: fluentbit-config
                  mountPath: /fluent-bit/etc/
            serviceAccountName: my-service-account
            terminationGracePeriodSeconds: 10
            volumes:
                - name: varlog
                  hostPath:
                    path: /var/log
                - name: logs
                  hostPath:
                    path: /data/logic/log/log
                - name: fluentbit-config
                  configMap:
                    name: fluentbit-config
eof                    

configmap文件

 cat > fluentbit-config.yml <<eof
 apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentbit-config
  labels:
    k8s-app: fluentbit
data:
  fluentbit.conf: |
    [SERVICE]
        Flush         1
        Log_Level     info
        Daemon        off

	[INPUT]
		Name              tail
		Tag               test-log-1
		Path              /data/logic/log/log/*.log
		DB                /var/log/testlog.db
		Mem_Buf_Limit     5MB
		Skip_Long_Lines   On
		Refresh_Interval  10 

	[INPUT]
		Name              tail
		Tag               test-log-2
		Path              /data/logs/*.log
		DB                /var/log/testlog.db
		Mem_Buf_Limit     5MB
		Skip_Long_Lines   On
		Refresh_Interval  10 

	[OUTPUT]
		Name                          kafka
		Match                         test-log-1
		brokers                       <KAFKA_SERVER>:<KAFKA_PORT>
		message_key                   message
		topic                         fluent-bit-logs-1
		type                          test-log-1
		time_key                      @timestamp
		logstash_format               On
		retry_limit                   False

	[OUTPUT]
		Name                          kafka
		Match                         test-log-2
		brokers                       <KAFKA_SERVER>:<KAFKA_PORT>
		message_key                   message
		topic                         fluent-bit-logs-2
		type                          test-log-2
		time_key                      @timestamp
		logstash_format               On
		retry_limit                   False

	[OUTPUT]
		Name                          logstash
		Match                         test-log-1,test-log-2
		host                          <LOGSTASH_HOST>:<LOGSTASH_PORT>
		protocol                      tcp
		port                          514
		logstash_prefix               fluent-bit
		logstash_dateformat           %Y%m%d
		flush_interval                5s

  [OUTPUT]
    Name                          s3
    Match                         test-log-1
    bucket                        lava-log-bucket
    region                        us-west-2
    store_dir                     /home
    total_file_size               10M
    upload_timeout                1m
    s3_key_format                 /fluentbit-logs/test-log/year=%Y/month=%m/day=%d/%H-%M-%S

	[FILTER]
		Name                grep
		Match               test-log-1
		Regex               (error|fail|invalid)

	[FILTER]
		Name                grep
		Match               test-log-2
		Regex               (error|fail|invalid)

	[OUTPUT]
		Name                          webhook
		Match                         test-log-1,test-log-2
		url                           https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY
		method                        post
		headers                       Content-Type: application/json
		message_key                   message
		format                        json
		json_date_format              iso8601
		json_map                      {"text": "${escaped_tag} ${escaped_message}"}
 eof

参数解释

  • fluentbit.conf:ConfigMap 的一个 key,表示在下面的 value 部分中定义 Fluent Bit 的配置文件内容。
  • value:Fluent Bit 的配置文件内容。
  • [SERVICE]:服务端配置部分。包括 Flush 阈值、日志级别、守护进程模式等参数。
  • Flush:指定缓冲区刷新阈值,单位为秒。
  • Log_Level:指定日志记录的级别,可以设置为 info、debug、trace 等。
  • Daemon:是否以守护进程模式运行 Fluent Bit。
  • [INPUT]:输入插件部分。定义了从指定文件路径读取日志记录的规则和条件。
  • Name:指定使用的输入插件类型,这里为 tail。
  • Tag:标记输入插件所读取的日志记录,并用于后续输出插件的匹配。
  • Path:指定从哪些文件路径读取日志记录。
  • DB:指定存储持久化状态的数据库文件路径。
  • Mem_Buf_Limit:指定使用的内存缓冲区大小。
  • Skip_Long_Lines:指定是否跳过超过缓冲区大小的长行数据。
  • Refresh_Interval:指定刷新配置文件的时间间隔。
  • [OUTPUT]:输出插件部分。定义了将日志记录发送到目标位置(Kafka、logstash、S3、webhook 等)的规则和条件。
  • Name:指定使用的输出插件类型。
  • Match:标记要发送给该输出插件的日志记录的标签。
  • brokers:指定 Kafka 服务器的地址和端口号。
  • message_key:指定输出消息中包含的关键字。
  • topic:指定写入 Kafka 的主题名称。
  • type:指定输出消息的类型。
  • time_key:指定从记录中提取时间戳的关键字。
  • logstash_format:指定是否将输出格式为 Logstash JSON 格式。
  • retry_limit:指定尝试重试失败消息的次数。
  • host:指定 logstash 服务器的 IP 地址或主机名。
  • protocol:指定协议类型,这里为 TCP。
  • port:指定 logstash 服务器使用的端口号。
  • logstash_prefix:指定在 Logstash 中生成的索引名称的前缀。
  • logstash_dateformat:指定日期格式。
  • flush_interval:指定刷新缓冲区的时间间隔。
  • bucket:指定 S3 存储桶名称。
  • region:指定 S3 存储桶所在的地理位置。
  • store_dir:指定本地存储目录路径。
  • total_file_size:指定本地存储文件的最大大小。
  • upload_timeout:指定上传到 S3 的超时时间。
  • s3_key_format:指定用于生成 S3 存储键的格式。
  • [FILTER]:过滤器部分。定义了在发送到输出插件之前对日志记录进行过滤和
  • [FILTER]:这个部分定义了 Fluentd 中的过滤器,用于筛选、转换和修改输入的数据。该例子中使用了两次过滤器,分别为 "grep"。
  • Name:这里指定了当前使用的过滤器名称,即 "grep"。
  • Match:这里指定了需要匹配的标签(tag),即 test-log-1 和 test-log-2。
  • Regex:这里指定了需要匹配的正则表达式,即匹配包含字符串 "error"、"fail" 或 "invalid" 的日志信息。
  • [OUTPUT]:这个部分定义了 Fluentd 中的输出插件,用于将处理后的数据输出到指定的目的地。该例子中使用了一个输出插件,即 "webhook"。
  • url:这里指定了 webhook 接收数据的地址,即 https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY。
  • method:这里指定了 webhook 的请求方法,即 post。
  • headers:这里指定了请求头信息,即 Content-Type: application/json。
  • message_key:这里指定了要发送的消息键名,即 message。
  • format:这里指定了要发送的数据格式,即 json。
  • json_date_format:这里指定了输出的 JSON 数据中日期格式的设置,即将日期格式设置为 ISO 8601。这将在 JSON 格式的数据中添加一个时间戳字段,并使用 ISO 8601 格式进行格式化。
  • json_map:这里指定了要发送到 webhook 的 JSON 数据的结构。在这个例子中,JSON 数据包含一个 "text" 键和对应的值,其中 "${escaped_tag} 和{escaped_message}" 分别代表了日志的标签和消息内容。注意,这里使用了 "${escaped_tag} {escaped_message}",表示会对日志信息中的特殊字符进行转义以确保可靠性。
  • 在这个例子中,"json_map" 指定了要发送到 Webhook 的 JSON 数据结构,并将 "${escaped_tag} 
    {escaped_message}" 插入到 "text" 字段中作为键值对。在实际使用中,当 Fluentd 处理日志数据时,它会填充这些预定义的变量来生成实际的 JSON 数据。

日志测试

at testapp.yml 
apiVersion: apps/v1
kind: Deployment
metadata:
  name: test-deployment
  labels:
    app: test
spec:
  selector:
    matchLabels:
      app: test
  replicas: 2
  template:
    metadata:
      labels:
        app: test
    spec:
      containers:
      - name: createlog
        image: busybox
        imagePullPolicy: Always
        command: [ "sh", "-c"]
        args:
        - i=0;
          while true; 
            do
              echo "[`date +%Y/%m/%d-%H:%M:%S`]"+$i+$HOSTNAME >> /data/logic/log/log/test-$HOSTNAME.log;
              let i=i+1; 
              sleep 2;
            done
        volumeMounts:
        - name: logs
          mountPath: /data/logic/log/log
      volumes:
      - name: logs
        hostPath:
          path: /data/logic/log/log


在 EKS 中使用 Fluent Bit 收集应用日志并保存到 S3 中的方案

http://www.zlprogram.com/Show/55/A6B28897.shtml

举报

相关推荐

0 条评论