今日内容:
     - kafka生产环境调优;
     - kafka配合ElasticStack技术栈的搭配使用;
     - zookeeper集群部署;
     - zookeeper的ACL;
     - zookeeper的调优;
     - PB级别项目;
     - ES8集群搭建/elk; (待定...)
     
 订阅1个的topic:
     老男孩: 10
     
 多个不同的主题分配如下: (linux82-elk: 3)
     c1: 0 3 6 9
     c2: 1 4 7
     c3: 2 5 8
 订阅多个的topic:
     老男孩: 10 t1
     Linux: 10 t2 
     Python: 10  t3
     
 多个不同的主题分配如下: (linux82-elk: 3)
     c1: t1_0 ...  t2_0 ...  t3_0 ...
     c2: t1_1 ...  t2_1 ...
     c3: t1_2  ...  t2_2 ...
     
     
     
     
订阅1个的topic:
     老男孩: 10
     
 多个不同的主题分配如下: (linux82-elk: 3)
     c1: 0 1 2
     c2: 3 4 5
     c3: 6 7 8 9
     
 10 / 3 = 3  .. 1 ===> 
 999 / 100 = 9 .. 99
JBOD ---> RAID 0
 ---> RAID
BOND ---> ...
 注意的参数:
     log.dirs
     auto.create.topics.enable
     zookeeper.connect
     num.io.threads
     
     
 ElasticStack集成kafka实战案例:
     1.创建topic
 kafka-topics.sh --bootstrap-server 10.0.0.103:9092 --create --topic oldboyedu-linux82-kafka-000001
    2.使用filebeat收集日志到kafka集群:
 cat > config/28-nginx-to-kafka.yaml <<EOF
 filebeat.inputs:
 - type: log
   paths:
     - /var/log/nginx/access.log*
   json.keys_under_root: true
 output.kafka:
   # 写入kafka集群的地址
   hosts: 
   - 10.0.0.102:9092
   - 10.0.0.103:9092
  # 写入集群的topic
   topic: "oldboyedu-linux82-kafka-000001"
 EOF
     3.使用logstash收集kafka日志
 cat > config/18-kafka-to-es.conf <<EOF
 input { 
    kafka {
      # 指定kafka的集群
      bootstrap_servers => "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092"
      # 从哪个topic消费数据
      topics => ["oldboyedu-linux82-kafka-000001"]
      # 指定消费者组
      group_id => "oldboyedu-linux82-logstash"
    }
 } 
 filter {
   json {
     source => "message"
     remove_field => ["tags","@version","ecs","agent","input","message"]
   }
   
   geoip {
      source => "clientip"
   }
  date {
      match => [
         "@oldboyedu-timestamp",
         "yyyy-MM-dd'T'HH:mm:ssZ"
      ]
   }
  useragent {
     source => "http_user_agent"
     target => "oldboyedu-linux82-useragent"
   }
  
 }
output { 
   stdout {}
  elasticsearch {
       hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
       index => "oldboyedu-linux82-project-kafka"
       user => "elastic"
       password => "123456"
   }
 }
 EOF
    
     
 zookeeper集群部署:
     1.创建zookeeper的数据目录
 install -d /oldboyedu/data/zk
 data_rsync.sh /oldboyedu/data/zk/
     2.修改单点zk的配置文件
 vim /oldboyedu/softwares/zk/conf/zoo.cfg                          
 ...
 # 定义最小单元的时间范围tick。
 tickTime=2000
 # 启动时最长等待tick数量。
 initLimit=5
 # 数据同步时最长等待的tick时间进行响应ACK
 syncLimit=2
 # 指定数据目录
 dataDir=/oldboyedu/data/zk
 # 监听端口
 clientPort=2181
 # 开启四字命令允许所有的节点访问。
 4lw.commands.whitelist=*
 # server.ID=A:B:C[:D]
 # ID:
 #    zk的唯一编号。
 # A:
 #    zk的主机地址。
 # B:
 #    leader的选举端口,是谁leader角色,就会监听该端口。
 # C: 
 #    数据通信端口。
 # D:
 #    可选配置,指定角色。
 server.101=10.0.0.101:2888:3888
 server.102=10.0.0.102:2888:3888
 server.103=10.0.0.103:2888:3888
    3.同步数据
 data_rsync.sh /oldboyedu/softwares/apache-zookeeper-3.8.0-bin/
     
     
     4.创建myid文件
 for ((host_id=101;host_id<=103;host_id++)) do ssh 10.0.0.${host_id} "echo ${host_id} > /oldboyedu/data/zk/myid";done
     
     5.所有节点启动zk服务
 zkServer.sh start
 zkServer.sh  status
    
     6.链接方式
 zkCli.sh -server 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/oldboyedu-linux82-kafka3.2.1
     
 zookeeper的leader选举流程:    
     myid:
         唯一标识一个zookeeper节点。
     zxid:
         唯一事务的标识。用于记录唯一的写操作!
     
     选举流程:
         (1)相比较zxid,若zxid较大,则会成为新的leader;
         (2)如果zxid比较不出来,则比较myid,myid较大者会有限成为新的leader;
     
     
 使用zkWeb管理zookeeper集群:
     1.运行zkWeb
 java -jar zkWeb-v1.2.1.jar &>/dev/null &
    2.访问webUI
 略。
临时znode:
     当前的会话退出时,znode会默认等待30秒后自动消失,等待时间是可以修改的哟。
     
 持久的znode:
     不会随着客户端的退出而删除znode。
 docker
     









