一.zookeeper简介
在部署ELK(Elasticsearch, Logstash, Kibana)集群时,通常需要一个Zookeeper集群来提供分布式协调服务。ZooKeeper官方地址,ZooKeeper官方指导文档地址。
ZooKeeper是一个开源的分布式协调服务,主要用于分布式应用程序的协调和管理。它提供高可用、高性能、高可靠性的服务,能够帮助开发人员在分布式环境中实现任务分配、状态同步、配置管理、分布式锁和队列等功能,从而构建高度可靠和可扩展的分布式系统。ZooKeeper的核心概念是znode,这是一种类似于文件和目录的数据结构,用于存储和管理数据。ZooKeeper将所有znode存储在一个分层的命名空间中,并提供了读取、写入、创建和删除等操作。 ZooKeeper在大数据技术和分布式系统中有着广泛的应用,例如在Hadoop、HBase、Hive等项目中都有使用。它基于Fast Paxos算法实现同步服务、配置维护和命名服务等分布式应用。
ZooKeeper的基本运转流程:
1.选举Leader。
2.同步数据。
3.选举Leader过程中算法有很多,但要达到的选举标准是一致的。
4.Leader要具有最高的执行ID,类似root权限。
5.集群中大多数的机器得到响应并接受选出的Leader。
#server.ID=A:B:C[:D]
#ID:zk的唯一编号,范围1-255,与myid的值要一样
#A:zk的主机地址
#B:leader选举端口,谁是leader角色,就会监听该端口,默认2888
#C:数据通信端口,默认3888
#D:可选配置,指定角色
ZooKeeper的状态:Leader、Follower和Observer
Leader状态:
定义:Leader是整个ZooKeeper集群的主节点,负责处理所有写请求和协调事务一致性。它是通过选举产生的,负责更新系统状态和维护集群的顺序性。
角色:Leader负责发起投票和决议,处理所有事务请求,并确保集群中的所有写操作都由其调度和处理。
Follower状态:
定义:Follower是集群中的从节点,处理客户端的非事务请求,并将事务请求转发给Leader。它们参与Leader选举过程。
角色:Follower接受客户端的读写请求,但不处理写请求。它们的主要作用是同步数据和参与选举过程,确保集群的稳定性和一致性。
Observer状态:
定义:Observer节点可以接受客户端的读写请求,但写请求会被转发给Leader。它们不参与投票过程,主要用于扩展系统的读取吞吐量。
角色:Observer不参与投票和事务提交,主要目的是提高集群的读取性能和扩展性。它们同步Leader的状态,但不参与决策过程。
选举过程和状态转换 集群节点半数以上存活集群才正常
在ZooKeeper中,Leader的选举是通过ZAB协议进行的。选举过程包括以下几个步骤:
投票:每个服务器发出一个投票,包含其myid和zxid(事务ID)。
处理投票:服务器接收并处理来自其他服务器的投票,根据zxid和myid进行比较。
统计投票:统计投票结果,优先级zxid>myid,集群半数以上选举出Leader后,就不再比较zxid和myid,即使他们比Leader大。
改变状态:当选的服务器成为Leader,其他服务器根据投票结果成为Follower或Observer。
通过这些状态和选举机制,ZooKeeper确保了数据的一致性和集群的稳定性。
二.kafka简介
Apache Kafka是一个开源的分布式事件流平台,主要用于处理大规模数据流。它提供了高吞吐量的消息发布和订阅机制,适用于实时日志收集、事件流处理等场景。
Kafka的核心功能
消息发布和订阅:
Kafka允许生产者(Producer)将消息发布到Kafka broker,消费者(Consumer)则从broker读取消息。
每个消费者可以属于一个特定的消费者组(Consumer Group),这种机制支持多个消费者并行消费消息,并且允许动态增加或删除消费者,从而实现高效的消息传递。
持久化日志:
Kafka使用持久化日志来存储消息,确保数据不丢失。每个消息都会被追加到一个可配置的持久化日志文件中,并且持久化到磁盘。
这种机制保证了即使在生产者和消费者之间发生故障时,消息也不会丢失。
分布式架构:
Kafka设计为分布式系统,集群中的数据被分割成多个分区,分布在多个服务器节点上。
这种架构使得Kafka具有高可用性和可扩展性,能够处理大量数据并且可以容易地扩展以适应不断增长的负载。
容错性:
Kafka提供了副本机制来保证数据的可靠性和容错性。每个分区可以配置多个副本,分布在不同的服务器节点上。
当某个副本失效时,Kafka会自动选择其他副本来继续提供服务,从而确保数据不丢失并保证服务的可用性。
Kafka的应用场景
实时日志收集:
Kafka能够高效地处理和存储大量的日志数据,适用于各种需要实时日志收集的场景。
事件流处理:
由于其高吞吐量和容错性,Kafka非常适合用于事件流处理,能够实时处理和分析大量的事件数据。
消息队列:
Kafka提供了消息队列的功能,支持生产者和消费者之间的异步通信,帮助平衡生产者和消费者之间的速度差异,提高系统的稳定性和可靠性。
三.ZooKeeper二进制部署
1.ZooKeeper二进制单节点部署
#下载二进制软件包
cd /es/softwares
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
#解压
tar xf apache-zookeeper-3.8.4-bin.tar.gz
#创建软连接
ln -s /es/softwares/apache-zookeeper-3.8.4-bin zk
#配置zk环境变量
cat >/etc/profile.d/zk.sh<<'EOF'
#!/bin/bash
export ZK_HOME=/es/softwares/zk
export PATH=$PATH:$ZK_HOME/bin
EOF
#生效
source /etc/profile.d/zk.sh
#创建zk的配置文件
cd /es/softwares/zk/conf/
cp zoo_sample.cfg zoo.cfg
#启动zk节点 服务管理{start status stop restart}
zkServer.sh start
#连接测试
zkCli.sh
#查看节点node
ls /
#退出
quit
#停止zk节点
zkServer.sh st
2.ZooKeeper二进制集群部署
elk01 | 192.168.77.176 | zk176 |
elk02 | 192.168.77.176 | zk177 |
elk03 | 192.168.77.176 | zk178 |
#所有节点操作
#下载二进制软件包
cd /es/softwares
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
#############或节点176 scp拷贝到其他节点#####################
scp /es/softwares/apache-zookeeper-3.8.4-bin.tar.gz root@192.168.77.177:/es/softwares/
scp /es/softwares/apache-zookeeper-3.8.4-bin.tar.gz root@192.168.77.177:/es/softwares/
#############或节点176 scp拷贝到其他节点#####################
#解压
tar xf apache-zookeeper-3.8.4-bin.tar.gz
#创建软连接
ln -s /es/softwares/apache-zookeeper-3.8.4-bin zk
#配置zk环境变量
cat >/etc/profile.d/zk.sh<<'EOF'
#!/bin/bash
export ZK_HOME=/es/softwares/zk
export PATH=$PATH:$ZK_HOME/bin
EOF
#生效
source /etc/profile.d/zk.sh
#创建zk数据和日志目录
mkdir -p /data/softwares/zk/{data,logs}
#创建zk的配置文件
cd /es/softwares/zk/conf/
cat >zoo.cfg<<'EOF'
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/softwares/zk/data
dataLogDir=/data/softwares/zk/logs
clientPort=2181
server.176=192.168.77.176:2888:3888
server.177=192.168.77.177:2888:3888
server.178=192.168.77.178:2888:3888
EOF
#配置服务器标识
#在dataDir指定的目录下创建文件myid,写入与该节点对应的服务器标识(如,在node1上,myid为1;在node2上,myid为2)
#节点1操作
echo 176 >/data/softwares/zk/data/myid
#节点2操作
echo 177 >/data/softwares/zk/data/myid
#节点3操作
echo 178 >/data/softwares/zk/data/myid
#启动Zookeeper服务 所有节点上执行
zkServer.sh start
#查看进程 QuorumPeerMain
jps
netstat -tlunp | grep -E "2181|2888|3888"
#查看zk的角色状态
zkServer.sh status
查看zk的角色状态
[root@elk01 ~]# zkServer.sh status
Mode: follower
[root@elk02 ~]# zkServer.sh status
Mode: leader
[root@elk03 ~]# zkServer.sh status
Mode: follower
四.kafka二进制部署
1.kafka二进制单节点部署 kafka软件包官方下载地址
#下载二进制软件包
cd /es/softwares
wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
#解压
tar xf kafka_2.13-3.9.0.tgz
#创建软连接
ln -s /es/softwares/kafka_2.13-3.9.0 kafka
#配置环境变量
cat >/etc/profile.d/kafka.sh<<'EOF'
#!/bin/bash
export KAFKA_HOME=/es/softwares/kafka
export PATH=$PATH:$KAFKA_HOME/bin
EOF
#生效
source /etc/profile.d/kafka.sh
#修改配置文件
cd /es/softwares/kafka/config/
#修改ID
sed -i 's/broker.id=0/broker.id=176/g' server.properties
#修改连接zk信息
sed -i 's/localhost:2181/192.168.77.176:2181,192.168.77.177:2181,192.168.77.178:2181/g' server.properties
#启动kafka单点
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
#查看进程 Kafka
jps
netstat -tlunp | grep 9092
#停止kafka单点服务
kafka-server-stop.sh
2.kafka二进制集群部署
elk01 | 192.168.77.176 | kafka176 |
elk02 | 192.168.77.176 | kafka177 |
elk03 | 192.168.77.176 | kafka178 |
#所有kafka节点操作
#下载二进制软件包
cd /es/softwares
wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
#############或节点176 scp拷贝到其他节点#####################
scp /es/softwares/kafka_2.13-3.9.0.tgz root@192.168.77.177:/es/softwares/
scp /es/softwares/kafka_2.13-3.9.0.tgz root@192.168.77.177:/es/softwares/
#############或节点176 scp拷贝到其他节点#####################
#解压
tar xf kafka_2.13-3.9.0.tgz
#创建软连接
ln -s /es/softwares/kafka_2.13-3.9.0 kafka
#配置环境变量
cat >/etc/profile.d/kafka.sh<<'EOF'
#!/bin/bash
export KAFKA_HOME=/es/softwares/kafka
export PATH=$PATH:$KAFKA_HOME/bin
EOF
#生效
source /etc/profile.d/kafka.sh
#修改配置文件
cd /es/softwares/kafka/config/
#修改连接zk信息
sed -i 's/localhost:2181/192.168.77.176:2181,192.168.77.177:2181,192.168.77.178:2181/g' server.properties
#修改kafka日志存储目录 自动创建
sed -i 's&/tmp/kafka-logs&/data/softwares/kafka/logs&g' server.properties
#节点1 修改ID为176
cd /es/softwares/kafka/config/
sed -i 's/broker.id=0/broker.id=176/g' server.properties
sed -i '34a\listeners=PLAINTEXT://192.168.77.176:9092' server.properties
#节点2 修改ID为177
cd /es/softwares/kafka/config/
sed -i 's/broker.id=0/broker.id=177/g' server.properties
sed -i '34a\listeners=PLAINTEXT://192.168.77.177:9092' server.properties
#节点3 修改ID为178
cd /es/softwares/kafka/config/
sed -i 's/broker.id=0/broker.id=178/g' server.properties
sed -i '34a\listeners=PLAINTEXT://192.168.77.178:9092' server.properties
#所有节点启动kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
#查看进程 Kafka
jps
netstat -tlunp | grep 9092
#查看zk的源数据
zkCli.sh -server 192.168.77.176:2181,192.168.77.177:2181,192.168.77.178:2181
ls /brokers/ids
#退出
quit
kafka配置文件
#节点176的配置文件
[root@elk01 config]# grep "^[a-Z]" server.properties
broker.id=176
listeners=PLAINTEXT://192.168.77.176:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/softwares/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.77.176:2181,192.168.77.177:2181,192.168.77.178:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
3.验证kafka
创建topic
#创建topic 名字为alibaby 3分区2副本 副本数不能超过Kafka broker数量
kafka-topics.sh --bootstrap-server 192.168.77.176:9092,192.168.77.177:9092,192.168.77.178:9092 --create --partitions 3 --replication-factor 2 --topic alibaby
查看topic
#查看topic列表
kafka-topics.sh --bootstrap-server 192.168.77.176:9092,192.168.77.177:9092,192.168.77.178:9092 --list
#查看指定topic详细信息 如alibaby
kafka-topics.sh --bootstrap-server 192.168.77.176:9092,192.168.77.177:9092,192.168.77.178:9092 --describe --topic alibaby
#查看所有topic详细信息
kafka-topics.sh --bootstrap-server 192.168.77.176:9092,192.168.77.177:9092,192.168.77.178:9092 --describe
创建生产者
#节点176 模拟生产者
kafka-console-producer.sh --bootstrap-server 192.168.77.176:9092 --topic alibaby
#输入数据,模拟生产的数据
>I am alibaby.
>who ara you?
创建消费者
#节点177 模拟消费者
kafka-console-consumer.sh --bootstrap-server 192.168.77.177:9092 --topic alibaby --from-beginning
#稍等就可以到生产者模拟的数据
I am alibaby.
who ara you?
删除topic
#删除topic
kafka-topics.sh --bootstrap-server 192.168.77.176:9092,192.168.77.177:9092,192.168.77.178:9092 --delete --topic alibaby
4.问题解决
#新版本
--bootstrap-server 192.168.77.176:9092,192.168.77.177:9092,192.168.77.178:9092
#旧版本
--zookeeper 192.168.77.176:2181,192.168.77.177:2181,192.168.77.178:2181
报错Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
at joptsimple.OptionParser.parse(OptionParser.java:396)
at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:567)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
不同版本创建topic的方式不一样,新版本不依赖zookeeper
#当前版本2.13
kafka-topics.sh --create --bootstrap-server 192.168.77.176:9092,192.168.77.177:9092,192.168.77.178:9092 --partitions 3 --replication-factor 2 --topic alibaby
#旧版本
kafka-topics.sh --create --zookeeper 192.168.77.176:2181,192.168.77.177:2181,192.168.77.178:2181 --partitions 3 --replication-factor 2 --topic alibaby