0
点赞
收藏
分享

微信扫一扫

Kafka笔记 (二) Kafka一些基础的概念与设计

盖码范 2022-03-12 阅读 53

1. 事件、生产者和消费者

事件(Event)、生产者(Producer)和消费者(Consumer)是Kafka最基础的三个元素,Event就是要传递的数据,不同语境下也可以说成是记录或者消息;生产者就是产生事件的应用,而消费者就是接收并处理事件的应用。生产者到事件到消费者的模式,就是发布\订阅模型的一个实现,不过和常见的发布\订阅系统相比较,Kafka最大的特点在于消息(事件)并非是生产者推送(Push)给消费者的,而是消费者自己根据需要进行拉取(Pull)的,事件将生产者和消费者完全解耦

事件的内容

目前一个Kafka事件的内容由三部分构成:

  • Data. 就是实际要传递的数据,它是一个key-value结构,消息的key可以自由指定,而value存储实际的数据
    请添加图片描述
  • Header. 这是Apache Kafka 0.11标准之后加入的内容,其实就是可以自定义的键值对,用来扩展事件记录的元数据,例如生产者的id,方便下游应用进行路由等操作。Header一般用的不多。
  • Metadata. 是Kafka自动给事件生成的元数据,其中最关键的信息就是时间戳Timestamp
    请添加图片描述
    对于Kafka事件的内容组成,你可能会比较困惑为什么把要把Data设计出key-value,这里的key有什么作用。其实key最主要的作用是用于分区Partition,下面的小节会详细描述;在大部分简单的kafka应用里,可以把key置空

典型的消费过程

可以用下面的示意图来大致描述默认情况下事件如何被消费者消费。在那之前先得了解几个概念:

  • 事件在Kafka里的储存方式可以就看成是日志文件,也就是说每个事件就是日志文件里的一条记录;
  • Topic是事件的组织单位,每条事件归属于某个Topic,可以把一个Topic看成是一个日志文件
  • 因为事件相当于日志文件里的记录,而每条记录在文件里都有一个偏移量offset,所以这个offset也就是这个事件的索引,要读取某个事件,就是根据它的offset直接从文件中读取数据

现在我们来看下事件的消费过程,以及过程中存在的一些情况:

首先生产者Producer向指定的Topic生产数据,正如上面所说的,存储事件的数据,就是在日志文件上添加记录,事件依次追加到日志文件末尾,每条事件有一个递增的偏移量
请添加图片描述
对于消费者Consumer,它读取数据同样按照偏移量依次从订阅的Topic中读取数据
请添加图片描述
因为消费者是主动去获取数据的,为了追溯它读取数据的位置,消费者需要定期对这个Topic进行提交Commit操作,其实就是记录当前读取到的事件的偏移量;下面的图里,在消费到记录4的时候,进行了Commit操作;消费者默认的Commit行为是周期性Commit,所以它下一次Commit,应该发生在读取记录9的时候(当然也可以手动Commit,不过不太推荐,因为太频繁的手动Commit也比较影响性能)
请添加图片描述
现在我们假设消费者程序在读取到记录7的时候崩溃停止了,而生产者则继续在生产数据
请添加图片描述
当生产者生产到记录10的时候,消费者程序恢复了运行请添加图片描述
恢复运行的消费者程序会查询到之前的Commit记录,知道最近的偏移量是4,所以将从记录4开始恢复读取数据
请添加图片描述
但是消费程序奔溃前其实是消费到记录6的,而重启后从记录5开始恢复读取数据,这也就意味着将存在重复读取的数据,既图中标红的5和6
请添加图片描述
存在这种情况并非是Kafka的设计缺陷,而是体现它的设计思想,即数据的使用权在消费者自身,消费者需要自己按照合适的应用场景来决定如何消费数据:

  • 如果这是一个工业数据采集的场景,事件里的数据是传感器的采样数据,而消费者程序的目的就是记录所有的采样数据,那么为了保证数据的完整性,它就只能按照上面的示意图的消费方式进行,来避免有数据遗失,而重复数据的问题则需要它自己处理
  • 如果这是一个实时数据在线显示的场景,消费者程序的目的只是为了在网页端实时显示最新的数据,它对数据完整性没有要求,那么它可以不采用上面这种默认的消费模式;事实上,默认的消费模式叫做earliest模式,如果改成latest模式,那么消费程序则只会读取最新的消息,忽略中间可能漏掉的数据

还有个问题是消费者的Commit记录在哪里,记录消费者最近的事件偏移量不会存在于消费端,而是由Kafka集群来记录。事实上,在Kafka中有个特殊的Topic叫 _ _ c o n s u m e r _ o f f s e t s \_\_consumer\_offsets __consumer_offsets,专门管理消费者的读取记录,消费者从这个Topic获取自己当前的偏移量,并且在里面存放提交时的偏移量请添加图片描述

2. Kafka就是日志文件系统

我们可以把Kafka的核心功能就看成是一个日志文件系统,因为Kafka最初就是以日志为核心的思想进行设计的(The Log: What every software engineer should know about real-time data’s unifying abstraction),并且最终的实现中也的确可以看到很多基于日志显而易见的优点:

  • 日志文件是持久化在磁盘里的,所以事件的所有记录都没有丢失(除非到了清理周期),这让Kafka表现的不仅是一个消息中间件,更像是一个缓存数据库
  • 消息或者事件简化的看就是时间戳加数据,这种形式就等同于日志记录,并且新的消息只需要直接追加到日志的末尾,因为日志的记录是按时间戳排序的
  • 对消息的读取可以按照偏移量进行依次读取,这相当于复杂度为 O ( 1 ) O(1) O(1)的操作,很多不采用日志设计的消息中间件,消息的索引结构的查询复杂度最好也只能到 O ( l o g N ) O(logN) O(logN)的程度

但同时,对于日志文件这个设计,很多人会怀疑它的性能问题,毕竟在常识里,文件的读写是很慢的。关于这一点,Confluent Kafka的官方文档(Persistence)对此进行了解释,并说明了Kafka使用文件结构时的优化设计

首先为什么不采用内存存储而是文件存储?这从两方面进行的考虑,一方面,从内存存储的方式进行考虑,Kafka本身是Java写的应用,用内存存储将涉及到JVM的性能,Kafka设计者经过测试发现:

  • 数据的对象化将额外多出很多空间占用,往往冗余的空间占用基本等于实际的数据占用了
  • 当数据量越来越大时,JVM的垃圾回收将会严重影响性能

另一方面,现在磁盘的读写也不会像想象的那么慢,只要设计得当,性能可能反而更高,其中最让人惊讶的一个结果是,某些情况下序列化的磁盘读写比随机化的内存读写更快。

在决定使用文件存储的基础上,Kafka的设计者们进行下面这些优化设计,来提升Kafka的性能

  • 使用页缓存进行文件读写. 现代的操作系统提供了页缓存机制来优化磁盘的读写,Kafka的源码中就使用页缓存的底层接口来操作磁盘
  • 批处理数据. 也就是写入或者读出数据都是当积累到合适的数量才进行,这样的主要目的是为了减少过多的小的IO操作
  • 制定统一的二进制消息格式标准,无论是生产者、消费者还是Kafka Broker,都按照这个标准处理数据,这样数据在他们之间的流动就不需要额外的格式转换处理
  • 零拷贝. 零拷贝是现代linux系统提供的减少数据转移过程中中间拷贝次数的功能,Kafka源码中调用了sendfile()这个系统接口实现零拷贝(不过这是基于上面那条统一的二进制格式标准)

3. 消息的分布式和并行化处理

分区 Partition

分区是Topic更下一级的划分,其实Partition是分布式环境下很自然而然产生的概念. 如果Kafka集群只有一个实例,那么Partition是没有必要的,因为所有的数据都在这个实例的Topic下
请添加图片描述
当我们对Kafka集群进行横向扩展,加入多个实例,那么这时Topic就是一个虚拟的概念,它下面的消息应当分布在多个实例中,每个实例里的消息就属于一个Partition,多个实例的多个Partition构成一个Topic
请添加图片描述
所以Topic是一个抽象的集合,而Partition则是具体的实现。

在分区之后对于生产者随之而来了一个问题,如何决定一个消息放到哪个分区中?Kafka是按照下面的策略进行分配的:

  • 如果生产者程序在向Kafka发送数据的时候指定了Partition值,那么直接把该消息分配到指定的Partition
  • 如果生产者程序没有指定Partition值,但是消息的Key非空,则使用一个Hash算法计算这个Key对应的Partition值;所以如果所有的消息的key都一样,那么最终所有的消息都被分配到同一个Partition内
  • 如果没有指定Partition,Key也是空值,那么Kafka使用一个分配算法尽量把消息均匀分配到各个Partition

消费组 Consumer Group

消费组这个概念是随着分区的引入而出现的。在大多数简单的应用场景下,我们的消费程序是独立的应用,就算消息是分布在Kafka集群的多个Partition中,对消费者程序来说和单实例没什么区别,因为它都是要获取所有的消息
请添加图片描述
在某些复杂的场景下,我们的消费者程序也可能是多实例的,例如我们的消费者应用是对高频消息进行数据处理的,为了最大化处理速度,我们平行运行多个相同的消费者程序,来处理所有的消息。这种情况下,让每个消费者实例都接收所有的分区数据显然是没有必要的,自然而然地,我们应该让每个消费者实例都只处理部分分区的数据,如下图所示,3个消费者实例,正好每个消费者接收一个分区数据,这三个消费者实例就构成一个消费组
请添加图片描述
不过,使用消费组也有个需要考虑的问题,每个消费实例只消费Topic内部分的消息,而且Kafka只保证单个Partition内的消息是按照时间戳严格排序的,不同Partition间不保证消息间的排序,这也就意味着一个消费实例分配多个Partition时,这多个Partition间的消息的顺序无法预料。所以一般推荐,就是把属于同组的消息就发送到同一个Partition,并且最好是Partition和消费实例一一对应,举个例子,我们可以把事件的Key设置为用户的ID,那所有该用户的事件都存在于同一个Partition内,这显然更方便消费端进行处理

消费组的负载平衡

上个小节只是说明消费组的概念和最理想的分区分配情况,当消费组内实例的数量和分区数不一样,或者消费组内有实例停止工作时,如何给消费组内的实例进行Partition的分配就是需要一套详细的重新分配负载平衡策略。完整的Kafka负载平衡协议很复杂,不过有几个最基本的原则:

  • 保证任何一个Topic的Partition只会被一个消费组内的实例消费
  • 一个消费者实例可以被分配多个Partition
  • 消费组内被分配Partition的数量 ≤ \leq Partition的数量

仍然用上面的示意图进行演示。假设最开始的时候只启动了两个消费者实例,那么Kafka按照某个分配算法将3个Partition这样分配给它们:
请添加图片描述
现在多运行一个消费者实例,这将触发Partition的负载平衡,3个消费者实例都会被分配到一个Partition
请添加图片描述
这时我们再开启一个消费者实例,这时被分配的消费者的数量已经等于Parition的数量,所以负载平衡不会被触发,多出的这个消费者实例处于"未激活"状态
请添加图片描述
这时假设消费实例1因为故障停止了工作,Kafka检测到该情况,于是一次负载平衡被触发,剩下的三个实例将被成功分配到一个Partition
请添加图片描述
这种动态分配是默认的负载均衡策略,在此基础上,Kafka也提供了静态成员Static Membership的功能,就是可以强制指定分配固定的Partition,无论负载均衡如何被触发,这种策略在某些情况下比动态分配资源消耗更小

4. 备份 Replication

为了提供稳定的分布式集群环境,Kafka对Replication进行了精心的设计。一句话描述Replication的过程,就是在集群内多个Broker之间互相备份Partition

在创建Topic的时候,对于Replication有个关键的参数叫拷贝因子(Replication Factor),表示这个Topic下的Partition需要有几个备份。在下面的示意图里,我们指定一个Topic有3个Partition,而Replication Factor指定为1;这种情况下,其实就是没有启用备份机制,每个Partition只有自身,各自存在于不同的Broker内
请添加图片描述
如果把Replication Factor设置为2,那么每个Partition在Broker集群中会有两个备份,分布在不同的Broker中;并且Kafka的Replication遵从主从机制,这两个备份有一个作为主备份Leader,另一个作为从备份Followers,生产或者消费数据,是从主备份执行,而从备份则是跟在主备份后面,持续同步主备份的数据
请添加图片描述
最后,Replication Factor不像Partition那样可以设置任意数量,它不能大于Broker的数量,毕竟如果备份数比集群内的Broker的数量还多,那么必然使得某些Broker内存在多个相同的备份,这是没有必要的

5. 数据保留与清理

虽然日志文件形式的数据存储给Kafka带来了性能上的优势,但是在对数据的保留与清理上,也带来一些特殊性:

  • 不像RabbitMQ之类把消息存放在典型的消息队列上,消费者每次消费一条数据后就自动把数据丢弃了,Kafka的数据是持久化中心化的,消费者只拥有读取数据的权力,不能指定丢弃数据,因为数据还要给其他消费者读取;因此只能由Kafka自己按照一定的数据保留策略对旧数据进行清理
  • 数据存放在日志文件上,追加数据很方便,但是要删除早期的数据却很麻烦,我们无法直接既删除一个文件前面的数据,同时又支持生产者继续在文件末尾追加数据

分段 Segment

为了支持数据的清理,分段Segment的概念自然而然被引入,其实就是对Partition进一步划分,一个Partition其实是由多个连续Segment子文件构成的
请添加图片描述

  • 最新的那个Segment是Partition内处于激活状态的Segment,Kafka的消息只会写入到激活态的Segment,当写入的数据达到Segment的容量限制,一个新的Segment文件被创建并激活,接收新的数据
  • Segment的名字是以它内部第一条消息的偏移量进行命名的,所以上面的示意图里Segment的命名不是0,1,2而是0,3,6
  • 使用Segment后,删除旧数据变得简单了,Kafka可以删除掉之前的Segment文件,而新数据的追加不受影响

日志压制 Log Compaction

直接删除旧的消息日志是节省磁盘空间最直接的做法,不过Kafka也提供了另一种数据保留策略,就是日志压制 Compaction,很多中文文档可能会翻译为日志压缩,但是严格说Compaction和真正的压缩是有区别的,用Compaction的直译"压制"应该更准确

某些事件是可以用最新的已知值来表示的,像下面这个例子,事件的Key是用户的ID,而Value是该用户的邮箱地址,用户可能会多次更新他的邮箱,但是显然我们不需要知道中间更新的过程,只需要知道最后一次更新后的邮箱值"bill@gmail.com"

# (key,value), key is user id, value is e-mail
(123,bill@microsoft.com)
-> (123,bill@gatesfoundation.org)
-> (123,bill@gmail.com)

这就是日志压制的原理,对于相同Key的记录,只保留最新的事件,这样既释放了磁盘空间,也可以保留数据最新的镜像。当然,Compaction需要考虑应用场景,如果数据是独立的以时间戳为主的"日志"式事件,或者事件没有指定Key,或者消费者就是需要完整的事件记录,那么Compaction是不适合的。请添加图片描述
不过具体到磁盘层面,Compaction不会像删除操作一样直接在Segment上进行,Kafka会启动一个后台线程,逐次重拷贝Segment进行Compaction,然后更新Segment的连接地址

数据保留策略的配置

正如上面所述,要主动控制Kafka内消息的保留或删除是不可行的,这个工作由Kafka内部的数据清理线程进行,我们能做的只是指定相关的数据保留配置,清理线程会按照配置采用合适的策略处理旧的数据,但我们无法精确预料到数据何时被删除(或者被压制).

Kafka的数据保留策略分两类,一类是基于时间的,也就是指定事件的生命周期,当清理线程检测到事件到期,会将其标记为删除或压制(但不一定立即就执行);一类是基于空间的,也就是指定Partition的容量,当Partition过大时对早期的Segment进行清理。一般来说基于空间的策略优先级会更高,毕竟如果磁盘占用量已经超限,那么Kafka将必需立即清理旧的数据来保证运行

具体到详细的配置项的话,就不是太容易整理了,因为存在很多和数据保留相关的配置项,而且这些配置项之间的优先级各有高低,下面只列出常用的一些:

首先在Broker配置层面会有一些全局性的配置项,相当于是所有Topic下事件默认的清理策略

配置项说明默认值
log.cleanup.policy采用哪种方式清理,删除还是压制delete
log.retention.check.interval.ms清理线程的运行频率3000 ms
log.cleaner.delete.retention.ms标记为delete的事件可以保留多久86400000 (1 天)
log.retention.ms事件的保留事时间(单位 ms),过了这个时间会被标记为Delete
log.retention.minutes事件的保留事时间(单位 minute),优先级比log.retention.ms低
log.retention.hours事件的保留事时间(单位 hour),优先级比log.retention.minutes 低
log.retention.bytes事件日志(Partition)的字节大小限制, 超过这个限制会真正执行Delete操作-1
log.segment.bytes事件日志(Segment)的字节大小限制1073741824 (1 GB)

不过显然很多情况下我们是希望不同的Topic有不同的清理策略,所以在Kafka上创建Topic的时候,可以对每个Topic设置自己的清理策略配置项,这些配置项的名字相比于Broker级别基本就是把log.前缀去掉,例如cleanup.policy, delete.retention.ms和retention.ms

举报

相关推荐

0 条评论