0
点赞
收藏
分享

微信扫一扫

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】


文章目录

  • ​​Kafka​​
  • ​​Kafka的使用场景​​
  • ​​Kafka基本概念​​
  • ​​Kafka的初体验​​
  • ​​创建主题​​
  • ​​查看kafka中目前存在的topic​​
  • ​​删除主题​​
  • ​​发送消息​​
  • ​​消费消息​​
  • ​​消费之前的消息​​
  • ​​消费最新的消息​​
  • ​​消费之前多主题的消息​​
  • ​​消费模式​​
  • ​​单播消费​​
  • ​​多播消费​​
  • ​​查看消费组名​​
  • ​​查看消费组的消费偏移量​​
  • ​​查看消费组testGroup和testGroup2的消费偏移量​​
  • ​​消费组的消费偏移量参数解释:​​
  • ​​发送消息进行理解消费组的消费偏移量​​
  • ​​发送一条消息​​
  • ​​查看偏移量​​
  • ​​客户端的消费组testGroup开始消费​​
  • ​​再次查看消费组testGroup和testGroup2的消费偏移量​​


本文内容:

Kafka

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_java

Kafka的使用场景

  • 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

Kafka基本概念

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_java_02

  • Broker:消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群。
  • Topic:Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic。
  • Producer:消息生产者,向Broker发送消息的客户端。 Consumer:消息消费者,从Broker读取消息的客户端。
  • ConsumerGroup:每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的ConsumerGroup消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息。
  • Partition:物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的。

Kafka的初体验

创建主题


创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1。当producer发布一个消息到某个指定的Topic,这个Topic如果不存在,就自动创建。


/opt/kafka_2.13-2.7.1/bin/kafka-topics.sh --create --zookeeper 106.14.132.94:2181 --replication-factor 1 --partitions 1 --topic test

查看kafka中目前存在的topic

/opt/kafka_2.13-2.7.1/bin/kafka-topics.sh --list --zookeeper 106.14.132.94:2181

列表中有一个__consumer_offsets主题,这个主题不能删除哟

删除主题

/opt/kafka_2.13-2.7.1/bin/kafka-topics.sh --delete --topic test --zookeeper 106.14.132.94:2181


如果出现This will have no impact if delete.topic.enable is not set to true.


彻底删除topic:

[1. ] 删除Topic,delete.topic.enable=true这里要设置为true,需要在$KAFKA/config/server.properties中配置​​delete.topic.enble=true​

[2. ] 删除log日志

[3. ] 删除ZK中的Topic记录

删除列表中有一个__consumer_offsets主题会出现,Topic __consumer_offsets is a kafka internal topic and is not allowed to be marked for deletion.


__consumer_offsets这个topic是由kafka自动创建的,默认49个,这个topic是不能被删除的!


为什么这里会是这样存储__consumer_offsets的呢?

[1.] 将所有 N Broker 和待分配的 i 个 Partition 排序
[2.] 将第 i 个 Partition 分配到第(i mod n)个 Broker 上

发送消息


kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容。


/opt/kafka_2.13-2.7.1/bin/kafka-console-producer.sh --broker-list 106.14.132.94:9092 --topic test


this is a news
this is a another news


消费消息


对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。由于我们已经发送了消息了,想要消费之前的消息可以通过–from-beginning参数指定。


消费之前的消息
/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --from-beginning --topic test
消费最新的消息
/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --topic test

通过不同的终端窗口来运行以上的命令,你将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。

消费之前多主题的消息

/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --from-beginning --whitelist "test|test2"

消费模式

单播消费


一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可。


分别在两个客户端执行如下消费命令,然后往主题里发送消息,结果只有一个客户端能收到消息。

发送消息:​​/opt/kafka_2.13-2.7.1/bin/kafka-console-producer.sh --broker-list 106.14.132.94:9092 --topic test​

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_kafka_03

客户端1:​​/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --consumer-property group.id=testGroup --topic test​

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_偏移量_04

客户端2:​​/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --consumer-property group.id=testGroup --topic test​

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_java_05

多播消费


一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。我们再增加一个消费者,该消费者属于testGroup-2消费组,结果两个客户端都能收到消息。


发送消息:​​/opt/kafka_2.13-2.7.1/bin/kafka-console-producer.sh --broker-list 106.14.132.94:9092 --topic test​


this is a more news
this is a more new2
this is a more new3


【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_分布式_06

客户端1:​​/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --consumer-property group.id=testGroup --topic test​

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_分布式_07

客户端2:​​/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --consumer-property group.id=testGroup --topic test​

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_偏移量_08

客户端3:​​/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --consumer-property group.id=testGroup2 --topic test​

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_分布式_09

查看消费组名

/opt/kafka_2.13-2.7.1/bin/kafka-consumer-groups.sh --bootstrap-server 106.14.132.94:9092 --list

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_偏移量_10

查看消费组的消费偏移量

查看消费组testGroup和testGroup2的消费偏移量

/opt/kafka_2.13-2.7.1/bin/kafka-consumer-groups.sh --bootstrap-server 106.14.132.94:9092 --describe --group testGroup
/opt/kafka_2.13-2.7.1/bin/kafka-consumer-groups.sh --bootstrap-server 106.14.132.94:9092 --describe --group testGroup2

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_偏移量_11

消费组的消费偏移量参数解释:

  • current-offset:当前消费组的已消费偏移量
  • log-end-offset:主题对应分区消息的结束偏移量(HW)
  • lag:当前消费组未消费的消息数

发送消息进行理解消费组的消费偏移量


1个客户端的消费组是testGroup,一个客户端的消费组是testGroup,他们二个都是同一个test主题。


发送一条消息
/opt/kafka_2.13-2.7.1/bin/kafka-console-producer.sh --broker-list 106.14.132.94:9092 --topic test


this is a testgroup message


查看偏移量
/opt/kafka_2.13-2.7.1/bin/kafka-consumer-groups.sh --bootstrap-server 106.14.132.94:9092 --describe --group testGroup

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_客户端_12


当前消费组的已消费偏移量(current-offset)没变,说明当前的消费组没有消费者进行消费,因为没有启动消费者。主题对应分区消息的结束偏移量(log-end-offset)加1了,说明当前分区的消息多了一条。当前消费组未消费的消息数(lag)加1了,说明有1条消息没有被消费。


客户端的消费组testGroup开始消费
/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092  --consumer-property group.id=testGroup --topic test
再次查看消费组testGroup和testGroup2的消费偏移量
/opt/kafka_2.13-2.7.1/bin/kafka-consumer-groups.sh --bootstrap-server 106.14.132.94:9092 --describe --group testGroup
/opt/kafka_2.13-2.7.1/bin/kafka-consumer-groups.sh --bootstrap-server 106.14.132.94:9092 --describe --group testGroup2

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_java_13


发现testGroup消费组当前消费组的已消费偏移量(current-offset)加1了,当前消费组未消费的消息数(lag)减1了,说明testGroup已经被消费了。testGroup2消费者还是一样没有发生变化,所以可以推测出,消费者是通过不同的消费组进行消费的,每个消费组互不影响。


总结


以上就是今天要讲的内容,还希望各位读者大大能够在评论区积极参与讨论,给文章提出一些宝贵的意见或者建议????,合理的内容,我会采纳更新博文,重新分享给大家。


????四连 关注????点赞????收藏⭐️留言????


感谢大家的支持,用心写博文分享给大家,你的支持(????点赞????收藏⭐️留言????)是对我创作的最大帮助。
????微信公众号:南北踏尘
????主页地址:java_wxid
????社区地址:幕后大佬


给读者大大的话


我本身是一个很普通的程序员,放在人堆里,除了与生俱来的????盛世美颜????、所剩不多的发量,就剩下180的大高个了。就是我这样的一个人,默默坚持写博文也有好多年了,有句老话说的好,????牛逼之前都是傻逼式的坚持????。希望自己可以通过大量的作品,时间的积累,个人魅力、运气和时机,可以打造属于自己的????技术影响力????。同时也希望自己可以成为一个????懂技术????,????懂业务????,????懂管理????的综合型人才,作为项目架构路线的总设计师,掌控全局的????团队大脑????,技术团队中的????绝对核心????是我未来几年不断前进的目标。


提示:以下都是资源分享,求个一键三连。

面试资料


福利大放送,????欢迎关注????点赞????收藏⭐️留言????,拜托了????,这对我真的很重要。
点击:​​面试资料​​ 提取码:2021


200套PPT模板


福利大放送,????欢迎关注????点赞????收藏⭐️留言????,拜托了????,这对我真的很重要。
点击:​​200套PPT模板​​ 提取码:2021


提问的智慧


福利大放送,????欢迎关注????点赞????收藏⭐️留言????,拜托了????,这对我真的很重要。
点击:​​提问的智慧​​ 提取码:2021


Java开发学习路线

名称

链接

JavaSE

点击: ​​JavaSE ​​

MySQL专栏

点击: ​​MySQL专栏​​

JDBC专栏

点击: ​​JDBC专栏​​

MyBatis专栏

点击: ​​MyBatis专栏​​

Web专栏

点击: ​​Web专栏​​

Spring专栏

点击: ​​Spring专栏​​

SpringMVC专栏

点击: ​​SpringMVC专栏​​

SpringBoot专栏

点击: ​​SpringBoot专栏​​

SpringCould专栏

点击: ​​SpringCould专栏​​

Redis专栏

点击: ​​Redis专栏​​

Linux专栏

点击: ​​Linux专栏​​

Maven3专栏

点击: ​​Maven3专栏 ​​

Spring Security5专栏

点击: ​​Spring Security5专栏​​

更多专栏

更多专栏,请到 ​​java_wxid主页​​ 查看

P5学习路线图

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_java_14P6学习路线图

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_客户端_15P7学习路线图

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_客户端_16P8学习路线图

【Kafka使用场景/基本概念/初体验/消费模式/消费组的偏移量】_偏移量_17



举报

相关推荐

0 条评论