kafka
什么是消息队列
在学习Kafka之前我们先来看一下什么是消息队列 消息队列(Message Queue):可以简称为MQ
例如:Java中的Queue队列,也可以认为是一个消息队列
消息队列:顾名思义,消息+队列,其实就是保存消息的队列,属于消息传输过程中的容器。 消息队列主要提供生产、消费接口供外部调用,做数据的存储和读取
消息队列分类
消息队列大致可以分为两种:点对点(P2P)、发布订阅(Pub/Sub)
· 共同点: 针对数据的处理流程是一样的 消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。
· 不同点: 点对点(p2p)模型包含:消息队列(Queue)、发送者(Sender)、接收者(Receiver) 一个生产者生产的消息只有一个消费者(Consumer)(消息一旦被消费,就不在消息队列中)消费。例如QQ中的私聊,我发给你的消息只有你能看到,别人是看不到的
发布订阅(Pub/Sub)模型包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到,或者QQ中的群聊,我在群里面发一条消息,群里面所有人都能看到
这就是这两种消息队列的区别
我们接下来要学习的Kafka这个消息队列是属于发布订阅模型的
什么是Kafka
Kafka 是一个 高吞吐量 的、 持久性 的、 分布式 发布订阅消息系统
· 高吞吐量:可以满足每秒百万级别消息的生产和消费。 为什么这么快? 难道Kafka的数据是放在内存里面的吗? 不是的,Kafka的数据还是放在磁盘里面的 主要是Kafka利用了磁盘顺序读写速度超过内存随机读写速度这个特性。 所以说它的吞吐量才这么高
· 持久性:有一套完善的消息存储机制,确保数据高效安全的持久化。
· 分布式:它是基于分布式的扩展、和容错机制;Kafka的数据都会复制到几台服务器上。当某一台机器故障失效时,生产者和消费者切换使用其它的机器。
Kafka的数据时存储是磁盘中的,为什么可以满足每秒百万级别消息的生产和消费? 这是一个面试题,其实就是我们刚才针对高吞吐量的解释:kafka利用了磁盘顺序读写速度超过内存随机读写速度这个特性
Kafka主要应用在实时计算领域,可以和Flume、Spark、Flink等框架结合在一块使用
例如:我们使用Flume采集网站产生的日志数据,将数据写入到Kafka中,然后通过Spark或者Flink从Kafka中消费数据进行计算,这其实是一个典型的实时计算案例的架构
Kafka组件介绍
接下来我们来分析一下Kafka中的组件,加深对kafka的理解
看这个图
先看中间的Kafka Cluster 这个Kafka集群内有两个节点,这些节点在这里我们称之为Broker Broker:消息的代理,Kafka集群中的一个节点称为一个broker
在Kafka中有Topic的概念 Topic:称为主题,Kafka处理的消息的不同分类(是一个逻辑概念)。 如果把Kafka认为是一个数据库的话,那么Kafka中的Topic就可以认为是一张表 不同的topic中存储不同业务类型的数据,方便使用
在Topic内部有partition的概念 Partition:是Topic物理上的分组,一个Topic会被分为1个或者多个partition(分区),分区个数是在创建topic的时候指定。每个topic都是有分区的,至少1个。 注意:这里面针对partition其实还有副本的概念,主要是为了提供数据的容错性,我们可以在创建Topic的时候指定partition的副本因子是几个。 在这里面副本因子其实就是2了,其中一个是Leader,另一个是真正的副本 Leader中的这个partition负责接收用户的读写请求,副本partition负责从Leader里面的partiton中同步数据,这样的话,如果后期leader对应的节点宕机了,副本可以切换为leader顶上来。
在partition内部还有一个message的概念 Message:我们称之为消息,代表的就是一条数据,它是通信的基本单位,每个消息都属于一个partition。
在这里总结一下 Broker>Topic>Partition>Message
接下来还有两个组件,看图中的最左边和最右边 Producer:消息和数据的生产者,向Kafka的topic生产数据。 Consumer:消息和数据的消费者,从kafka的topic中消费数据。 这里的消费者可以有多个,每个消费者可以消费到相同的数据
最后还有一个Zookeeper服务,Kafka的运行是需要依赖于Zookeeper的,Zookeeper负责协调Kafka集群的正常运行。
kafka 单机部署
1,首先部署jdk
参考下面
[root@hadoop1 html]# mv jdk1.8* /opt/
[root@hadoop1 html]# cd /opt/
[root@hadoop1 opt]# ll
总用量 0
drwxr-xr-x 7 10 143 245 10月 5 2019 jdk1.8.0_231
drwxr-xr-x. 2 root root 6 10月 31 2018 rh
[root@hadoop1 opt]# mv jdk1.8.0_231/ jdk1.8
vi /etc/profile
export JAVA_HOME=/opt/jdk1.8
export PATH=$JAVA_HOME/bin:$PATH
[root@hadoop1 opt]# chmod -R 755 /opt/jdk1.8
[root@hadoop1 opt]# source /etc/profile
[root@hadoop1 opt]# java -version
2,部署zk
tar zxvf apache-zookeeper-3.5.9-bin.tar.gz -C /opt/
修改配置文件
首先将zoo_sample.cfg
重命名为zoo.cfg
然后修改zoo.cfg
中的dataDir
参数的值,dataDir
指向的目录存储的是zookeeper的核心数据,所以这个目录不能使用tmp目录
3,
[root@localhost conf]# grep "data" zoo.cfg
dataDir=/data1/zk
# The number of snapshots to retain in dataDir
[root@localhost conf]# pwd
/opt/apache-zookeeper-3.5.9-bin/conf
4.启动
[root@localhost bin]# ./zkServer.sh start
集群部署
zookeeper集群安装
首先还原单节点
rm -rf apache-zookeeper-3.5.9-bin
解压
tar zxvf apache-zookeeper-3.5.9-bin.tar.gz
修改配置
将zoo_sample.cfg
重命名为zoo.cfg
[root@bigdata01 conf]# mv zoo_sample.cfg zoo.cfg
[root@bigdata01 conf]# pwd
/data/soft/apache-zookeeper-3.5.9-bin/conf
然后修改zoo.cfg中的dataDir
参数的值,dataDir指向的目录存储的是zookeeper的核心数据,所以这个目录不能使用tmp目录,然后增加server.0
、server.1
、server.2
这三行内容
dataDir=/data/soft/apache-zookeeper-3.5.9-bin/data
最后
server.0=bigdata01:2888:3888
server.1=bigdata02:2888:3888
server.2=bigdata03:2888:3888
创建目录保存myid
文件,并且向myid
文件中写入内容
myid
中的值其实是和zoo.cfg
中server
后面指定的编号是一一对应的
编号0
对应的是bigdata01
这台机器,所以在这里指定0
在这里使用echo 和 重定向 实现数据写入
[root@bigdata01 apache-zookeeper-3.5.9-bin]# pwd
/data/soft/apache-zookeeper-3.5.9-bin
[root@bigdata01 apache-zookeeper-3.5.9-bin]# mkdir data
[root@bigdata01 apache-zookeeper-3.5.9-bin]# cd data/
[root@bigdata01 data]# echo 0 > myid
配置复制到从节点
[root@bigdata01 soft]# scp -rq apache-zookeeper-3.5.9-bin bigdata02:/data/soft
[root@bigdata01 soft]# scp -rq apache-zookeeper-3.5.9-bin bigdata03:/data/soft
去从节点修改配置
[root@bigdata02 ~]# cd /data/soft/apache-zookeeper-3.5.9-bin/data/
[root@bigdata02 data]# ll
总用量 4
-rw-r--r--. 1 root root 2 3月 30 18:34 myid
[root@bigdata02 data]# echo 1 > myid
[root@bigdata02 data]# cat myid
[root@bigdata03 soft]# cd /data/soft/apache-zookeeper-3.5.9-bin/data/
[root@bigdata03 data]# echo 2 > myid
所有节点启动
[root@bigdata02 apache-zookeeper-3.5.9-bin]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/soft/apache-zookeeper-3.5.9-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@bigdata02 apache-zookeeper-3.5.9-bin]#
分别在bigdata01、bigdata02、bigdata03上执行jps命令验证是否有QuorumPeerMain进程 如果都有就说明zookeeper集群启动正常了 如果没有就到对应的节点的logs目录下查看zookeeper-.out日志文件
执行bin/zkServer.sh status 命令会发现有一个节点显示为leader,其他两个节点为follower
kafka
前提zk jdk 没问题
kafka 安装参考下面
推荐
172.22.192.30
172.22.192.151
172.22.192.211
cd /app/kafka
tar zxvf kafka.tar.gz
[root@es-node-1 config]# cd /app/kafka/kafka/config
cp server.properties server.properties.bak
grep -Ev '^#|^$' server.properties > ok1
mv ok1 server.properties
[root@jx-aep-log-monitor-0001 config]# grep -Ev '^#|^$' server.properties
broker.id=1
listeners=PLAINTEXT://172.22.192.30:5879
auto.create.topics.enable=true
delete.topic.enable=true
advertised.listeners=PLAINTEXT://172.22.192.30:5879
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/kafka/data
num.partitions=6
default.replication.factor=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# zk 地址
zookeeper.connect=172.22.192.121:22181,172.22.192.6:22181,172.22.192.131:22181/aep-log-center
zookeeper.connection.timeout.ms=6000000
group.initial.rebalance.delay.ms=0
将包发送到其他主机
35 2022-05-09 11:09:35 root scp -rq kafka root@172.27.64.102:/app/
36 2022-05-09 11:10:51 root scp -rq kafka root@172.27.64.52:/app/
主机2 修改配置
[root@jx-aep-log-monitor-0002 config]# grep -E "listeners=PLAINTEXT|advertised|broker.id|zookeeper.connect=" /app/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://172.27.64.102:5879
advertised.listeners=PLAINTEXT://172.27.64.102:5879
zookeeper.connect=172.22.192.121:22181,172.22.192.6:22181,172.22.192.131:22181/aep-log-center
主机3 修改配置
[root@jx-aep-log-monitor-0003 config]# grep -E "listeners=PLAINTEXT|advertised|broker.id|zookeeper.connect=" /app/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://172.27.64.52:5879
advertised.listeners=PLAINTEXT://172.27.64.52:5879
zookeeper.connect=172.22.192.121:22181,172.22.192.6:22181,172.22.192.131:22181/aep-log-center
启动:
cd /app/kafka/bin/
./kafka-server-start.sh -daemon ../config/server.properties
ps -elf | grep kafka
报错 将data 目录删除重新启动
创建topic
./kafka-topics.sh --create --zookeeper 172.22.192.121:22181,172.22.192.6:22181,172.22.192.131:22181/aep-log-center --replication-factor 2 --partitions 1 --topic aep-logs
./kafka-topics.sh --create --zookeeper 172.22.192.121:22181,172.22.192.6:22181,172.22.192.131:22181/aep-log-center --replication-factor 2 --partitions 1 --topic nginx-logs
列出
./kafka-topics.sh --list --zookeeper 172.22.192.121:22181,172.22.192.6:22181,172.22.192.131:22181/aep-log-center
查看是否进行了消费
./kafka-console-consumer.sh --bootstrap-server 172.28.64.9:5879 --topic aep-logs --from-beginning
./kafka-console-consumer.sh --bootstrap-server 172.28.64.9:5879 --topic nginx-logs --from-beginning
删除topic
./kafka-topics.sh --delete --zookeeper 172.17.66.127:22181,172.17.66.232:22181,172.17.66.158:22181/aep-log-center --topic aep-logsss
kafka 常用基本命令
· 新增Topic:指定2个分区,2个副本,注意:副本数不能大于集群中Broker的数量
./kafka-topics.sh --create --bootstrap-server 192.168.40.180:9092 --replication-factor 1 --partitions 1 --topic test2
[root@localhost kafka]# grep -Ev "^#|^$" config/server.properties
broker.id=3
listeners=PLAINTEXT://192.168.40.182:9092
advertised.listeners=PLAINTEXT://192.168.40.182:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
#文件存储目录
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.40.180:2181,192.168.40.181:2181,192.168.40.182:2181/kafka
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
#自己添加的
auto.create.topics.enable=true
delete.topic.enable=true
default.replication.factor=3
生产环境配置对比
[root@xxxxx config]# grep -Ev '^#|^$' server.properties
broker.id=1
listeners=PLAINTEXT://xxxxx
auto.create.topics.enable=true
delete.topic.enable=true
advertised.listeners=PLAINTEXT://xxxx
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/kafka/data
num.partitions=6
default.replication.factor=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# zk 地址
zookeeper.connect=xxxx
zookeeper.connection.timeout.ms=6000000
group.initial.rebalance.delay.ms=0
查看详细topic信息
/kafka-topics.sh --describe --bootstrap-server 192.168.40.182:9092 --topic xlisi-z