Kafka学习之路
一、Kafka的简介
1.1、概述
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息系统。
Kafka主要设计目标如下:
—消息持久化:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
—高吞吐率:即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
—分布式:支持KafkaServer间的消息分区,及分布式消息,同时保证每个partition内的消息顺序传输。
—跨平台:支持不同技术平台的客户端(如:Java、PHP、Python等)。
—实时性:同时支持离线数据处理和实时数据处理。
—伸缩性:支待在线水平扩展。
1.2、消息系统介绍
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数在两个或多个应用间是如何传递的。
分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。
有两种方要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式。
1.3、点对点消息传递模式
在点对点消息系统中,消息持久化到一个队列中,此时将有一个或多个消费者消费队列中的数据。
但是一条消息只能被消费一次,当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。
该模式即使有多个消息者同时消费数据,也能保证数据处理的顺序。
这种架构描述示意图如下:
生产者发送一条消息到queue中,只有一个消费者能收到。
1.4、发布-订阅消息传递模式
在发布-订阅消息系统中,消息持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中的所有数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。
在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。
该模式的示例图如下:
发布者发送到topic中的消息,只有订阅了topic的订阅者才会收到消息。
二、Kafka的优点
2.1、解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其因难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两过的处理过程,只要确保它们遵守同样的接口约束。
Kafka具备消息系统的优点,只要生产者和消费者数据两端遵循接口约束,就可以自行扩展或修改数据处理的业务过程。
2.2、冗余(副本)
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成数据丢失。
消息队列把数据进行持久化,直到它们已经被完全处理,通过这一方式规避了数据丢失内险。
许多消息队列所采用的“插入-获取-删除”模式,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完成,从而确保你的数据被安全的保存直到你使用完毕。
2.3、扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电子按钮一样简单。
2.4、灵活性与峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见,所以为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。
使用消息队列能够使关键组件顶住突发的访问能力,而不会因为突发的超负荷的请求而完全崩溃。
2.5、可恢复性
系统的一部分组件失败时,不会影响到整个系统运行。消息队列降低了进程间的耦合度,即使一个处理消息的进程挂掉,加入到队列中的消息仍然可以在系统恢复后被处理。
2.6、顺序保证
在大多数使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内消息的有序性。
2.7、缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如:加载一张图片比应用过滤器花费更少的时间。
消息队列通过一个缓冲层来帮助任务最高效率的执行,使写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
2.8、异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时想再去处理它们。
三、常用MessageQueue对比
名称 功能 | ActiveMQ | RocketMQ | RabbitMQ | Kafka | |
定位 | 设计定位 | 可靠消息传输 | 非日志的可靠消息传输 | 可靠消息传输 | 实时数据处理以及日志处理 |
基础对比 | 成熟度 | 成熟 | 成熟 | 成熟 | 日志领域成熟 |
所属社区/公司 | Apache | Alibaba开发现加入Apache | Apache | Apache | |
社区活跃度 | 高 | 中 | 高 | 高 | |
API完备性 | 高 | 高 | 高 | 高 | |
开发语言 | Java | Java | ErLang | Scala | |
支持协议 | OpenWire、STOMP、REST、XMMP、AMQP | 自己定义的一套(社区提供JMS—不成熟) | AMQP | 一套自行设计的基于TCP的二进制协议 | |
持久化方式 | 内存/文件/数据库 | 磁盘文件 | 内存/文件 | 磁盘文件 | |
客户端支持语言 | Java、C、C++、Python、PHP、Perl、.net等 | Java、C++(不成熟) | Java、C、C++、Python、PHP、Perl、.net等 | C、C++、Python、Go、PHP等 | |
可用性/性 能比较 | 部署方式 | 单机/集群 | 单机/集群 | 单机/集群 | 单机/集群 |
集群管理 | 独立 | nameserver | 独立 | zookeeper | |
选主方式 | 基于zookeeper+leavelDB的master-slave方式 | 支持多master模式,多master多slave模式,异步复制模式 | Master提供服务,slave提供备份 | 支持多副本机制,leader宕机,flower自动顶上重新选举leader | |
可用性 | 高 | 非常高 | 高 | 非常高 | |
消息写入性能 | 较好 | 很好 | 较好 | 非常好 | |
单机队列数 | 较好 | 单机最高5万 | 依赖内存 | 单机超过64个队列或分区会出现飙高 | |
功能对比 | 事务消息 | 支持 | 支持 | 不支持 | 不支持 |
消息过滤 | 不支持 | 支持 | 不支持 | 不支持 | |
消息查询 | 不支持 | 支持 | 不支持 | 不支持 | |
消息失败重试 | 支持 | 支持 | 支持 | 不支持 | |
消息重新消费 | 支持 | 支持 | 不支持 | 支持 | |
消费方式 | Consumerpull | Consumerpull | Brokerpush | Consumerpull | |
批理发送 | 支持 | 支持 | 不支持 | 支持 | |
消息清理 | 指定文件保存时间过期删除 | 指定文件保存时间过期删除 | 可用内存少于40%触发gc | 指定文件保存时间过期删除 | |
总结 | 1、成熟的产品,已经在很多公司得到应用(非大规模场景)有较多的文档。 2、各种协议支持较好,有多种语言的成熟的客户端。 | 1、在高吞吐、低延迟、高可用、集群热扩展、集群容错上有非常好的表现。 2、Api系统设计都更适合在业务处理的场景。 3、支持多种消费方式。 4、提供消息顺序消费能力,consumer可以水平扩展,消费能力强。 | 1、在高吞吐量、高可用上不好kafka和rocketMQ。 2、支持多种客户端语言。 3、由于erlang语言特性,性能比较好,使用RAM模式时,性能很好。 | 1、在高吞吐、低延迟、高可用、集群热扩展、集群容错上有非常好的表现。 2、Producer端提供缓存、压缩功能,可节省性能,提高效率。 3、提供顺序消费。 4、生态完善,在大数据处理方面有大量配套设施。 |
四、Kafka中的术语解释
4.1、概述
在深入理解Kafka之前,先介绍一下Kafka中的术语。下图是Kafka的相关术语以及之间的关系:
上图中一个topic配置了3个partition。Partition1有2个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。
如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。
4.2、broker
Kafka集群包含一个或多个服务器,服务器节点称为broker。
broker存储topic的数据,如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况发生,这种情况容易导致Kafka集群数据不境外衡。
4.3、Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的Topic即可生产或消费数据,而不必关心数据存于何处。
4.4、Partition
topic中的数据分割为一个或多个partition,每个topic至少有一个partition,每个partition中的数据使用多个segment文件存储。
partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序,在需要严格保证消息消费顺序的场景下,需要把partition数据设为1。
4.5、Producer
生产者即数据发布者,该角色将消息发布到Kafka的topic中,broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
4.6、Consumer
消费者可以从broker中读取数据,消费者也可以消费多个topic中的数据。
4.7、ConsumerGroup
每个Consumer属于一个特定的ConsumerGroup(可为每个Consumer指定groupname,若不指定groupname,则属于默认的group)。
4.8、Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写partition。
4.9、Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。
如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“insyncreplicas”(ISR)列表中删除,重新创建一个Follower。
五、Kafka的架构
所上图所示,一个典型的Kafka集群中包含若干个Producer(可以是web前端产生的PageView,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高)。
Kafka通过Zookeeper管理集群配置,选举leader,以及在ConsumerGroup发生变化时进行rebalance。
Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
5.1、Topics和Partition
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放时哪个queue里。
为了使得Kafka的吞吐率可以以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。
创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是是需要的资源也越多,同时也会导致更高的不可用性,Kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
对于传统的messagequeue而言,一般会删除已经被消费的的消息,而Kafka集群会保留所有的的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示:
#Theminimumageofalogfiletobeeligiblefordeletion log.retention.hours=168 #Themaximumsizeofalogsegmentfile.Whenthissizeisreachedanewlogsegmentwillbecreated. log.segment.bytes=1073741824 #Theintervalatwhichlogsegmentsarecheckedtoseeiftheycanbedeletedaccordingtotheretentionpolicies log.retention.check.interval.ms=300000 #Iflog.cleaner.enable=trueissetthecleanerwillbeenabledandindividuallogscanthenbemarkedforlogcompaction. log.cleaner.enable=false |
因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与得高Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。
另外,Kafka会为每一个ConsumerGroup保留一些metadata信息—当前消费的消息的position,也即offset,这个offset由Consumer控制,正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offset由Consumer控制,所以Kafkabroker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过borker去保证同一个ConsumerGroup只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
5.2、Producer消息路由
Producer发送消息到broker时,会根据Partition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入到不同broker的不同Partition里,极大的提高了吞吐率。
可以在$KAFKA_HOME/config/server.properties中通过配置项num.partittions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。
在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。Paritition机制可以通过指定Producer的paritition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。
5.3、ConsumerGroup
使用ConsumerhighlevelAPI时,同一Topic的一条消息只能被同一个ConsumerGroup内的一个Consumer消费,但多个ConsumerGroup可同时消费这一消息。
这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个ConsumerGroup。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用ConsumerGroup还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的ConsumerGroup即可。
5.4、Pullvs.Pull
作为一个消息系统,Kafka遵循了传统的方式,选择由Producer向brokerpush消息并由Consumer从brokerpull消息。一些logging-centricsystem,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
5.5、Kafkadeliveryguarantee
有这么几种可能的deliveryguarantee:
Atmostonce | 消息可能会丢,但绝不会重复传输 |
Atleastone | 消息绝不会丢,但可能会重复传输 |
Exactlyonce | 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的 |
当Producer向broker发送消息时,一旦这条消息被commit,因为replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactlyonce。
接下来讨论的是消息从broker到Consumer的deliveryguarantee语义。(仅针对KafkaconsumerhighlevelAPI)。Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactlyonce。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的deliveryguaranteesemantic。
Kafka默认保证Atleastonce,并且允许通过设置Producer异步提交来实现Atmostonce。而Exactlyonce要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接、非常容易得使用这种方式。
六、Kafka的高可用
6.1、高可用的由来
6.1.1、为何需要Replication
Kafka在0.8以前的版本中,是没有Replication的,一旦某一个Broker宕机,则其上所有的Partition数据都不可以被消费,这与Kafka数据持久性及DeliveryGuarantee的设计目标相悖,同时Producer都不能再将数据存于这些Partition中。
如果Producer使用同步模式,则Producer会在尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception,用户可以选择停止发送后续数据也可以选择继续选择发送。而前者会造成数据的阻塞,后者会造成本应发往该Broker的数据丢失。
如果Producer使用异步模式,则Producer会尝试重新发送message.send.max.retries(默认值为3)次后记录该异常,并继续发送后续数据,这会造成数据丢失,并且用户只能通过日志发现该问题。同时,Kafka的Producer并未对异常模式提供callback接口。
由此可见,在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作,则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言Replication机制的引入非常重要。
6.1.2、LeaderElection
引入Replication之后,同一个Partition可能会有多个Replica,而这时需要在这些Replication之间选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica作为Follower从Leader中复制数据。
因为需要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必须要能继续服务,并且即不能造成数据重复也不能造成数据丢失)。如果没有一个Leader,所有Replica都可同时读/写数据,那就需要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了Replication实现的复杂性,同时也增加了出现异常的几率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。
6.2、KafkaHA设计解析
6.2.1、如何将所有Replica均匀分布到整个集群
为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。
一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力,也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。
Kafka分配Replica的算法如下:
1.将所有Broker(假设共n个Broker)和待分配的Partition排序。
2.将第i个Partition分配到第(imodn)个Broker上。
3.将第i个Partition的第j个Replica分配到第((i+j)moden)个Broker上。
6.2.2、DataReplication(副本策略)
Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。
6.2.2.1、消息传递同步策略
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的ReplicationFactor为多少,Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log,每个Follower都从Leaderpull数据,这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就会立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。
Consumer读消息也是从Leader读取,只有被commit过的消息才会暴露给Consumer。
KafkaReplication的数据流如下图所示:
6.2.2.2、ACK前需要保证有多少个备份
对于Kafka而言,定义一个Broker是否“活着”包含两个条件:
—一是它必须维护与ZooKeeper的session(这个通过ZooKeeper的Heartbeat机制来实现)。
—二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。
Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-syncReplica)。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.max.messages配置,其默认值是4000)或者Follower超过一定时间(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,完全同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。
需要说明的是,Kafka只解决fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。
6.2.2.3、LeaderElection算法
Leader选举本质上是一个分布式锁,有两种方式实现基于ZooKeeper的分布式锁:
—节点名称唯一性:多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁。
—临时顺序节点:所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁。
一种非常常用的选举leader的方式是“MajorityVote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个Replica(包含Leader和Follower),那在commit之前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不能超过f个。因为在剩下的任意f+1个Replica里,至少有一个Replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几个Broker,而非最慢那个。
MajorityVote也有一些劣势,为了保证LeaderElection的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的Replica,如果要容忍2个Follower挂掉,必须要有5个以上的Replica。
也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的Replica,而大量的Replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HAFeature是基于majority-vote-basedjournal,但是它的数据存储并没有使用这种方式。
Kafka在ZooKeeper中动态维护了一个ISR(in-syncreplicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,MajorityVote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是MajorityVote的一半。
虽然MajorityVote与ISR相比有不需等待最慢的Broker这一优势,但是Kafka作者认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题,并且节省下来的Replica和磁盘使得ISR模式仍然值得。
6.2.2.4、如何处理所有Replica都不工作
在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
1.等待ISR中的任一个Replica“活”过来,并且选它作为Leader。
2.选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader。
这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。
Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
6.2.2.5、选举Leader
最简单最直观的方案是,所有Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeralznode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(ZooKeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。
但是该方法会有3个问题:
1.split-brain这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致。
2.herdeffect如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整。
3.ZooKeeper负载过重,每个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增加到几千个Partition时ZooKeeper负载会过重。
Kafka0.8.*的LeaderElection方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比ZooKeeperQueue的方式更高效)通知需要为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
6.3、HA相关ZooKeeper结构
6.3.1、admin
该目录下znode只有在有相关操作时才会存在,操作结束时会将其删除。
/admin/reassign_partitions用于将一些Partition分配到不同的broker集合上。对于每个待重新分配的Partition,Kafka会在该znode上存储其所有的Replica和相应的Broker id。该znode由管理进程创建并且一旦重新分配成功它将会被自动移除。
6.3.2、broker
即/brokers/ids/[broker Id])存储“活着”的broker信息。
topic注册信息(/brokers/topics/[topic]),存储该topic的所有partition的所有replica所在的broker id,第一个replica即为preferred replica,对一个给定的partition,它在同一个broker上最多只有一个replica,因此broker id可作为replica id。
6.3.3、controller
/controller->int(broker id of the controller)存储当前controller的信息。
/controller_epoch->int(epoch)直接以整数形式存储controller epoch,而非像其它znode一样以JSON字符串形式存储。
6.4、producer发布消息
6.4.1、写入方式
producer采用push模式将消息发布到broker,每条消息都被append到partition中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障Kafka吞吐率)。
6.4.2、消息路由
producer发送消息到broker时,会根据分区算法选择将其存储到哪一个partition。其路由机制为:
1、指定了partition,则直接使用。
2、未指定partition但指定key,通过对key的value进行hash选出一个partition。
3、partition和key都未指定,使用轮询选出一个partition。
6.4.3、写入流程
producer写入消息序列图如下所示:
流程说明:
1、producer先从zookeeper的"/brokers/.../state"节点找到该partition的leader。 2、producer将消息发送给该leader。 3、leader将消息写入本地log。 4、followers从leader pull消息,写入本地log后leader发送ACK。 5、leader收到所有ISR中的replica的ACK后,增加HW(high watermark,最后commit的offset)并向producer发送ACK。 |
6.5、broker保存消息
6.5.1、存储方式
物理上把topic分成一个或多个partition(对应server.properties中num.partitinotallow=3配置),每个partition物理上对应一个文件夹(该文件夹存储该partition的所有消息和索引文件),如下:
6.5.2、存储策略
无论消息是否被消费,Kafka都会保留所有消息。有两种策略可以删除旧数据:
1、 基于时间:log.retention.hours=168
2、 基于大小:log.retention.bytes=1073741824
6.6、Topic的创建和删除
6.6.1、创建topic
创建topic的序列图如下所示:
流程说明:
1、controller在ZooKeeper的/brokers/topics节点上注册watcher,当topic被创建,则controller会通过watch得到该topic的partition/replica 分配。 2、controller从/brokers/ids读取当前所有可用的broker列表,对于set_p中的每一个partition: 2.1、从分配给该partition的所有replica(称为AR)中任选一个可用的broker作为新的leader,并将AR设置为新的ISR 2.2、将新的leader和SR写入/brokers/topics/[topic]/partitions/[partition]/state 3、controller通过RPC向相关的broker发LeaderAndISRRequest。 |
6.6.2、删除topic
删除topic的序列图如下所示:
流程说明:
1、controller在zooKeeper的/brokers/topics节点上注册watcher,当topic被删除,则controller会通过watch得到该topic的partition/replica分配。 2、若delete.topic.enable=false,结束;否则controller注册在/admin/delete_topics上的watch被fire,controller通过回调向对应的broker发送 StopReplicaRequest。 |
6.7、broker failover
Kafka broker failover序列图如下所示:
流程说明:
1、controller在zookeeper的/brokers/ids/[brokerId]节点注册Watcher,当broker宕机时zookeeper会fire watch 2、controller从/brokers/ids节点读取可用broker 3、controller决定set_p,该集合包含宕机broker上的所有partition 4、对set_p中的每一个partition 4.1、从/brokers/topics/[topic]/partitions/[partition]/state节点读取ISR 4.2、决定新leader 4.3、将新leader、ISR、controller_epoch和leader_epoch等信息写入tate节点 5、通过 PC向相关broker发送leaderAndISRRequest命令 |
6.8、controller failover
当controller宕机时会触发controller failover。每个broker都会在zookeeper的"/controller"节点注册watcher,当controller宕机时zookeeper 中的临时节点消失,所有存活的broker收到fire的通知,每个broker都尝试创建新的controller path,只有一个竞选成功并当选为controller。
当新的controller当选时,会触发KafkaController.onControllerFailover方法,在该方法中完成如下操作:
1、读取并增加Controller Epoch。 2、在reassigned-Partitions Patch(/admin/reassign_partitions)上注册watcher。 3、在preferred-Replica-Election Path(/admin/preferred_replica_election)上注册watcher。 4、通过partition-State-Machine在broker Topics Patch(/brokers/topics)上注册watcher。 5、若delete.topic.enable=true(默认值是 false),则partition-State-Machine在Delete Topic Patch(/admin/delete_topics)上注册watcher。 6、通过replica-State-Machine在Broker Ids Patch(/brokers/ids)上注册Watch。 7、初始化Controller-Context对象,设置当前所有topic,“活”着的broker列表,所有partition的leader及ISR等。 8、启动replica-State-Machine和partition-State-Machine。 9、将broker-State状态设置为Running-As-Controller。 10、将每个partition的Leadership信息发送给所有“活”着的broker。 11、若auto.leader.rebalance.enable=true(默认值是true),则启动partition-rebalance线程。 12、若delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。 |
七、Kafka的安装(集群为例)
7.1、安装前提(Zookeeper安装)
下载地址:
https://downloads.apache.org/zookeeper/
此处用此版本进行安装apache-zookeeper-3.5.9-bin.tar.gz
7.1.1、上传安装包,并解压到指定目录
此处用/kafka做为安装目录,分别把zookeeper在kafka01、kafka02上进行解压。
[root@kafka01 kafka]# tar -xvf apache-zookeeper-3.5.9-bin.tar.gz -C /kafka
7.1.2、修改配置文件
1、把模板配置文件进行改名
[root@kafka01 zookeeper]# cp ./conf/zoo_sample.cfg ./conf/zoo.cfg
2、修改配置文件参数
[root@kafka01 conf]# vi zoo.cfg
在配置文件的最下面一行添加以下内容
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/kafka/zookeeper/data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 dataLogDir=/kafka/zookeeper/log server.1=kafka01:2888:3888 server.2=kafka02:2888:3888 |
3、配置文件参数说明
3.1、基本配置选项
tickTime:心跳基本时间单位,毫秒级,ZK基本上所有的时间都是这个时间的整数倍。 initLimit:tickTime的个数,表示在leader选举结束后,followers与leader同步需要的时间,如果followers比较多或者说leader的数据非常多时,同步时间相应可能会增加,那么这个值也需要相应增加。当然,这个值也是follower和observer在开始同步leader的数据时的最大等待时间(setSoTimeout) syncLimit:tickTime的个数,这时间容易和上面的时间混淆,它也表示follower和observer与leader交互时的最大等待时间,只不过是在与leader同步完毕之后,进入正常请求转发或ping等消息交互时的超时时间。 dataDir:内存数据库快照存放地址,如果没有指定事务日志存放地址(dataLogDir),默认也是存放在这个路径下,建议两个地址分开存放到不同的设备上。 clientPort:配置ZK监听客户端连接的端口 server.serverid=host:tickpot:electionport server:固定写法 |
3.2、高级配置选项
dataLogDir:将事务日志存储在该路径下,比较重要,这个日志存储的设备效率会影响ZK的写吞吐量。 globalOutstandingLimit:(Java system property: zookeeper.globalOutstandingLimit)默认值是1000,限定了所有连接到服务器上但是还没有返回响应的请求个数(所有客户端请求的总数,不是连接总数),这个参数是针对单台服务器而言,设定太大可能会导致内存溢出。 preAllocSize:(Java system property: zookeeper.preAllocSize)默认值64M,以KB为单位,预先分配额定空间用于后续transactionlog 写入,每当剩余空间小于4K时,就会又分配64M,如此循环。如果SNAP做得比较频繁(snapCount比较小的时候),那么请减少这个值。 snapCount:(Java system property: zookeeper.snapCount)默认值100,000,当transaction每达到snapCount/2+rand.nextInt(snapCount/2)时,就做一次SNAPSHOT,默认情况下是50,000~100,000条transactionlog就会做一次,之所以用随机数是为了避免所有服务器可能在同一时间做snapshot. traceFile (Java system property: requestTraceFile) maxClientCnxns:默认值是10,一个客户端能够连接到同一个服务器上的最大连接数,根据IP来区分。如果设置为0,表示没有任何限制。设置该值一方面是为了防止DoS攻击。 clientPortAddress:与clientPort匹配,表示某个IP地址,如果服务器有多个网络接口(多个IP地址),如果没有设置这个属性,则clientPort会绑定到所有IP地址上,否则只绑定到该设置的IP地址上。 minSessionTimeout:最小的session time时间,默认值是2个tick time,客户端设置的session time 如果小于这个值,则会被强制协调为这个最小值。 maxSessionTimeout:最大的session time 时间,默认值是20个tick time. ,客户端设置的session time 如果大于这个值,则会被强制协调为这个最大值。 |
3.3、集群配置选项
electionAlg:领导选举算法,默认是3(fast leader election,基于TCP),0表示leader选举算法(基于UDP),1表示非授权快速选举算法(基于UDP),2表示授权快速选举算法(基于UDP),目前1和2算法都没有应用,不建议使用,0算法未来也可能会被干掉,只保留3(fast leader election)算法,因此最好直接使用默认就好。 initLimit:tickTime的个数,表示在leader选举结束后,followers与leader同步需要的时间,如果followers比较多或者说leader的数据灰常多时,同步时间相应可能会增加,那么这个值也需要相应增加。当然,这个值也是follower和observer在开始同步leader的数据时的最大等待时间(setSoTimeout) syncLimit:tickTime的个数,这时间容易和上面的时间混淆,它也表示follower和observer与leader交互时的最大等待时间,只不过是在与leader同步完毕之后,进入正常请求转发或ping等消息交互时的超时时间。 leaderServes:(Java system property: zookeeper.leaderServes) 如果该值不是no,则表示该服务器作为leader时是需要接受客户端连接的。为了获得更高吞吐量,当服务器数三台以上时一般建议设置为no。 cnxTimeout:(Java system property: zookeeper.cnxTimeout) 默认值是5000,单位ms 表示leaderelection时打开连接的超时时间,只用在算法3中。 |
3.3、ZK的不安全配置项
skipACL (Java systemproperty: zookeeper.skipACL) 默认值是no,忽略所有ACL检查,相当于开放了所有数据权限给任何人。 forceSync (Java systemproperty: zookeeper.forceSync) 默认值是yes, 表示transactionlog在commit时是否立即写到磁盘上,如果关闭这个选项可能会在断电时丢失信息。 jute.maxbuffer (Java system property: jute.maxbuffer)默认值0xfffff,单位是KB,表示节点数据最多1M。如果要设置这个值,必须要在所有服务器上都需要设置。 授权认证配置项 DigestAuthenticationProvider.superDigest (Java system property only: zookeeper.DigestAuthenticationProvider.superDigest) 设置这个值是为了确定一个超级用户,它的值格式为 super:<base64encoded(SHA1(idpassword))> ,一旦当前连接addAuthInfo超级用户验证通过,后续所有操作都不会checkACL. |
7.1.3、将修改好的配置文件发到集群中的其它机器
[root@kafka01 conf]# scp zoo.cfg root@kafka02:/kafka
注:然后是最重要的步骤,在各个ZooKeeper服务器节点,新建目录dataDir=/kafka/zookeeper/data和dataLogDir=kafka/zookeeper/log,这个目录就是你在zoo.cfg中配置的dataDir和dataLogDir的目录。
建好对应的目录之后,在dataDir对应的目录里面新建一个文件,文件名叫myid,里面存放的内容就是服务器的id,就是 server.1=kafka01:2888:3888当中的id, 就是1,那么对应的每个服务器节点都应该做类似的操作拿服务器kafka01举例:
[root@kafka01 zookeeper]# mkdir -p /kafka/zookeeper/data [root@kafka01 zookeeper]# cd ./data [root@kafka01 data]# echo 1>>myid |
当以上所有步骤都完成时,意味着我们
7.1.4、添加zookeeper环境变量
[root@kafka01 zookeeper]# vi /etc/profile.d/zookeeper.sh export ZOOKEEPER_HOME=/kafka/zookeeper export PATH=$ZOOKEEPER_HOME/bin:$PATH [root@kafka01 zookeeper]# source /etc/profile.d/zookeeper.sh |
7.1.5、启动zookeeper进行验证
[root@kafka01 zookeeper]# zkServer.sh start ZooKeeper JMX enabled by default Using config: /kafka/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@kafka01 zookeeper]# zkServer.sh status ZooKeeper JMX enabled by default Using config: /kafka/zookeeper/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower |
7.1.6、查看启动进程
[root@kafka01 zookeeper]# jps 21891 QuorumPeerMain 15461 Jps |
注:其它的zookeeper的机器也都要进行相关操作进行启动、查看状态、进程是否正常。
7.2、Kafka安装包下载
下载地址:
http://kafka.apache.org/downloads.html
此处用此版本进行安装kafka_2.12-3.0.0.tgz。
7.2.1、上传安装包,并解压到指定目录
此处用/kafka做为安装目录,分别把kafka在kafka01、kafka02上进行解压。
[root@kafka01 kafka]# tar -xvf kafka_2.12-3.0.0.tgz -C /kafka
7.2.2、修改配置文件
[root@kafka01 kafka]# cd /kafka/kafka/config
主要关注server.properties这个配置文件即可,在配置文件的目录下,还有一个zookeeper的配置文件,我也可以根据kafka内带的zk集群来启动,但还是建议使用独立的zk集群。
//当前机器在集群中的唯一标识,和zookeeper的myid性质一样 broker.id=0 //当前kafka对外提供服务的端口默认是9092 listeners=PLAINTEXT://192.168.2.12:9092 //这个是broker进行网络处理的线程数 num.network.threads=3 //这个是broker进行I/O处理的线程数 num.io.threads=8 //发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后再发送,能提高性能 socket.send.buffer.bytes=102400 //kafka接收缓冲区大小,当数据到达一定大小后,再序列化到磁盘 socket.receive.buffer.bytes=102400 //这个参数是向kafka请求消息或者向kafka发送消息的请求的最大数,这个值不能超过java的堆栈大小 socket.request.max.bytes=104857600 //消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数,如果配置多个目录,当前以逗号分割 log.dirs=/kafka/kafka/log //默认的分区数,一个topic默认1个分区数 num.partitinotallow=1 //每个数据目录用来日志恢复的线程数目 num.recovery.threads.per.data.dir=1 //默认消息的最大持久化时间,168小时,7天 log.retention.hours=168 //这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值时,kafka会新起一个文件 log.segment.bytes=1073741824 //每隔300000毫秒去检查上面配置的log失败时间 log.retention.check.interval.ms=300000 //是否启动log压缩,一般不用启动,启用的话可以提高性能 log.cleaner.enable=false //设置zookeeper的连接端口(这里以集群为例,如果不是集群添加一个地址即可) zookeeper.cnotallow=192.168.2.12:2181,192.168.2.13:2181 //设置zookeeper的连接超时时间 zookeeper.connection.timeout.ms=18000 |
producer.properties
bootstrap.servers=192.168.2.12:9092,192.168.2.13:9092 |
7.2.3、启动kafka集群
[root@kafka01 kafka]# kafka-server-start.sh config/server.properties
集群中的所有kafka都进行启动,并检查日志有没有报错。
7.2.4、创建topic
[root@kafka01 kafka]# ./bin/kafka-topics.sh --create --bootstrap-server 192.168.2.12:9092 --replication-factor 2 --partitions 2 --topic topic2 Created topic topic2. |
7.2.5、查看topic副本信息
[root@kafka01 kafka]# ./bin/kafka-topics.sh --describe --bootstrap-server 192.168.2.12:9092 --topic topic2 Topic: topic2TopicId: xt-xQhz8QqmAZgbyuJA5cgPartitionCount: 2ReplicationFactor: 2Configs: segment.bytes=1073741824 Topic: topic2Partition: 0Leader: 1Replicas: 1,0Isr: 1,0 Topic: topic2Partition: 1Leader: 0Replicas: 0,1Isr: 0,1 |
7.2.6、生产者发送消息
在生产者一方,执行以下命令,并随便输入一些信息,让其生产。
[root@kafka01 ~]# kafka-console-producer.sh --broker-list 192.168.2.12:9092 --topic topic2 >hello kafka >1111 |
7.2.7、消费者消费消息
在消费端查看生产端生产的消息是否可以获取到。
[root@kafka02 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.2.12:9092 --from-beginning --topic topic2 1111 hello kafka |
消费者从生产者一端获取到了生产者输入的“111”、“hello kafka”,说明可以正常生产与消费了