在上一节中,说明了各个节点的基本概念,在这里将会将这些概念进行进一步的叙述(上一节是这篇博客)
一:各节点的作用
Broker
在kafka里面就是一个数据节点
Topic
是一个主题,类似于一张表或ES中的index;在kafka里面没有上面数据类型的说法(来的时候就是一条一条文本数据,发出去的时候就一条一条的发出去)
Partition
分区,与Topic的关系是:Topic里面有多个Partitiom;Patition又分为leader
partition和follower
partition。
- kafka里面的follower,就是作了一个数据的备份,防止数据丢了还有另一个数据可以去查询;如果数据重要就可以多加几个备份
follower partition其实就是replication
,
producer:生产者
consumer:消费者:在kafka里面一个数据可以被多个消费者消费
consumer group:好处在于当这个组里面有一个消费者消费了,这个组里面的其他消费者就不能消费了,一组只能消费一个数据一次
offset:偏移量,即我们读取数据的位置
源数据信息都放到zookeeper上面
二:消息偏移量
2.1 概念说明
生产者将消息发送给broker,broker会将消息保存在本地的日志文件中,如下:
/root/kafka/data/kafka-logs/主题-分区/00000000.log
消息的保存是有序的,通过offset偏移量来描述消息的有序性
消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置
图例解释
topic在物理层面以partition为分组,一个topic可以分成若干个partition
partition还可以细分为Segment,一个partition物理上由多个Segment组成
segment的参数有两个:
- log.segment.bytes:单个segment可容纳的最大数据量,默认为1GB
- log.segment.ms:Kafka在commit一个未写满的segment前,所等待的时间(默认为7天)
LogSegment文件由两部分组成,分别为".index"文件和".log"文件,分别表示为Segment索引文件和数据文件。
- partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。
- 数值大小为64位,20位数字字符长度,没有数字用0填充
第一个segment 00000000000000000000.index 00000000000000000000.1og 第二个segment,文件命名以第一个segment的最后一条消息的offset组成 00000000000000170410.index 00000000000000170410.log 第三个segment,文件命名以上一个segment的最后一条消息的offset组成 00000000000000239430.index 00000000000000239430.log
消息都具有固定的物理结构,包括: offset(8 Bytes)、消息体的大(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、atributes(1 Byte),key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。
图例解释
2.1 实际操作
2.1.1 查看日志路径所在位置
//进入配置文件查看之前配置的日志文件位置
[root@localhost bin]# cat ../config/server.properties
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
//这个就是日志文件的位置
log.dirs=/root/kafka/data/kafka-logs
...
...
group.initial.rebalance.delay.ms=0
[root@localhost bin]#
2.1.2 进入日志文件路径
[root@localhost local]# cd /root/kafka/data/kafka-logs
[root@localhost kafka-logs]# ls
baidu-0 __consumer_offsets-23 __consumer_offsets-44
baidu-1 __consumer_offsets-24 __consumer_offsets-45
baidu-2 __consumer_offsets-25 __consumer_offsets-46
baidu-3 __consumer_offsets-26 __consumer_offsets-47
baidu-4 __consumer_offsets-27 __consumer_offsets-48
baidu-5 __consumer_offsets-28 __consumer_offsets-49
cleaner-offset-checkpoint __consumer_offsets-29 __consumer_offsets-5
__consumer_offsets-0 __consumer_offsets-3 __consumer_offsets-6
__consumer_offsets-1 __consumer_offsets-30 __consumer_offsets-7
__consumer_offsets-10 __consumer_offsets-31 __consumer_offsets-8
__consumer_offsets-11 __consumer_offsets-32 __consumer_offsets-9
__consumer_offsets-12 __consumer_offsets-33 dblab01-0
__consumer_offsets-13 __consumer_offsets-34 log-start-offset-checkpoint
__consumer_offsets-14 __consumer_offsets-35 meta.properties
__consumer_offsets-15 __consumer_offsets-36 recovery-point-offset-checkpoint
__consumer_offsets-16 __consumer_offsets-37 replication-offset-checkpoint
__consumer_offsets-17 __consumer_offsets-38 studentlog-0
__consumer_offsets-18 __consumer_offsets-39 studentlog-1
__consumer_offsets-19 __consumer_offsets-4 studentlog-2
__consumer_offsets-2 __consumer_offsets-40 studentlog-3
__consumer_offsets-20 __consumer_offsets-41 studentlog-4
__consumer_offsets-21 __consumer_offsets-42 studentlog-5
__consumer_offsets-22 __consumer_offsets-43 userlog-0
[root@localhost kafka-logs]#
2.1.3 查看
[root@localhost kafka-logs]# cd userlog-0/
[root@localhost userlog-0]# ll
总用量 8
//
-rw-r--r--. 1 root root 10485760 1月 20 20:11 00000000000000000000.index
//真正保存kafka消息的文件
-rw-r--r--. 1 root root 144 1月 20 20:33 00000000000000000000.log
//
-rw-r--r--. 1 root root 10485756 1月 20 20:11 00000000000000000000.timeindex
-rw-r--r--. 1 root root 8 1月 20 20:11 leader-epoch-checkpoint
[root@localhost userlog-0]# cat 00000000000000000000.log
NQ~wrH~wrHddd=~wz&G~WZ&G
三:单播和多播消息的实现
在一个kafka的topic中,启动两个消费者,一个生产者,问:生产者发送消息,这条消息是否同时会被两个消费者消费?
如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息。换言之,同一个消费组中只能有一个消费者收到一个topic中的消息
。
3.1 单播消息实现
配置消费者组:从头开始(会将之前输入过的内容再次输出)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --consumer-property group.id=testGroup --topic userlog
下面我将步使用beginning,避免之前的数据干扰
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic userlog
3.1.1 概念说明
单播消息:一个消费组里只会有一个消费者能消费到某一个topic中的消息。于是可以创建多个消费者,这些消费者在同一个消费组中。
3.1.2 实践
3.1.2.1 启动生产者
[root@localhost ~]# cd kafka//kafka_2.11-2.4.1/bin/
[root@localhost bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic userlog
>
3.1.2.2 启动消费者
消费者1:
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic userlog
first
测试:
现在启动另一个消费者
消费者2:
[root@localhost ~]# cd kafka/kafka_2.11-2.4.1/bin/
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic userlog
现在开始测试:
通过实践证明:新的消费者上线之后变成新的消费者进行消费了,这个消费不是轮询的,现在的生产的消息都是新的消费者消费
3.2 多播消息实现
3.2.1 概念说明
不同的消费组订阅同一个topic,那么这些不同的消费组中只有一个消费者能收到消息。多个消费组中的多个消费者收到了同一个消息。
说明:
- 一个消息只能被同一个消费者组里面的一个消费者消费
- 一个消费者可以被所有不同消费者组里面的消费者同时消费
3.2.1 实践
停掉之前的消费者2
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic userlog
second
thid
thir
ceishi
^CProcessed a total of 4 messages
[root@localhost bin]#
新建一个消费者,但是与之前的不是同一个消费者组
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic userlog
测试结果:
四:查看消费者组的详细信息
4.1 参数说明
Currennt-offset:当前消费组的已消费偏移量
Log-end-offset:主题对应分区消息的结束偏移量(HW)
Lag:当前消费组未消费的消息数
4.2 查看当前主题下有哪些消费组
#查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
测试
[root@localhost ~]# cd kafka/kafka_2.11-2.4.1/bin/
[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
testGroup1
testGroup
[root@localhost bin]#
4.3 查看消费组中的具体信息
#查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
#查看 testGroup 组
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
测试:
[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testGroup userlog 0 8 8 0 consumer-testGroup-1-ca34d0f6-5379-4f79-88e0-1cdbc05f307a /127.0.0.1 consumer-testGroup-1
[root@localhost bin]#