kafka的基本概念
- 一、基本概念
- 1、broker
- 2、producer
- 3、consumer
- 4、cousumer group
- 5、topic
- 6、partition
- 7、replicas
- 二、生产者分区策略
- 1、默认分区策略
- 2、自定义分区策略
- 三、生产者 acks 应答机制
- 1、acks = 0
- 2、acks = 1
- 3、acks = [all | -1]
- 四、日志文件的 HW 和 LEO
- 1、HW
- 2、LEO
- 3、举例说明
- 五、消费者的分区策略
- 六、消费者offset的维护
- 1、自动提交offset
- 2、手动提交offset
- 3、自定义提交offset
- 七、拦截器
- 1、生产者拦截器
- 2、消费者拦截器
- 八、参考文档
一、基本概念
1、broker
broker 指的一个kafka服务器,一个kafka集群是由多个 kafka broker 组成。
2、producer
producer 指的是消息生产者,即发送消息到 kafka broker 的客户端。
3、consumer
consumer 指的消息消费者,即从 kafka broker 获取消息的客户端。
4、cousumer group
consumer group 指的是消费者组,拥有相同的 group id 的消费者构成一个消费者组。
- 消费者组与消费者之间互不影响。
- 消费者组内的每个消费者负责消费不同的分区的数据。
-
一个分区只能有同一个分区中的一个消费者消费。
5、topic
topic 指的是主题。生产者生产消息、消费者消费消息,都需要指定一个topic。
6、partition
partition 指的是分区。
- 一个
topic 可以存在多个分区。 - 一个分区,只能被同一个消息者组中的某一个消费者消费。
-
每个分区上的消息都是有序的,但是主题(topic)上的消息不是有序的。 - 每个分区可能存在多个
follower,其中负责读/写的是leader。 - 每个
kafka broker可以是当前分区的leader,也可以是其它分区的follower。 - 多个分区可以提高程序的并发性,因为一个分区只能一个分区消息,多个分区可以多个消费者同时消费。
7、replicas
replicas 指的是副本。数据冗余,保证集群的可用性。
-
leader 指的是topic主题的分区中,每个分区的 主 。生产者发送数据、消费者消费数据都是从 leader分区 中操作的。 -
follower 指的是 topic主题的分区中,每个分区的 从。负责从leader分区同步数据,当 leader 宕机时,follower 可能成为新的 leader。默认是从 ISR 中进行选举。设置unclean.leader.election.enable = true可以从非ISR节点中进行选举,这样可以导致丢失数据,默认值是false,建议是false。 -
ISR(in-sync replicas)
- 与leader保持同步的follower集合
- 如果follower在超过
replica.lag.time.max.ms 毫秒,没有与leader进行同步,则踢出ISR。
-
OSR(out-sync replicas)
- 落后leader太多的副本
-
AR
- ISR + OSR
是否允许非ISR的副本参与选举。
二、生产者分区策略
即生产者,发送发送消息后,该消息保存到kafka broker上的topic上的那个partition上。
1、默认分区策略
默认的分区策略使用的是 org.apache.kafka.clients.producer.internals.DefaultPartitioner 这个类。
即我们代码中使用org.apache.kafka.clients.producer.ProducerRecord发送消息时。
- 在指定了
partition 的情况下,直接使用指定的 分区。 - 没有指定
partition 但指定了key的情况下,将key的hash值与topic的partition
数进行取余得到partition值。 - 没有指
partition和key,在一个批次满的时候会发送到同一个分区,当一个新的批次创建时,会发送到另外的分区中。可以通过KIP-480 获取更多的粘性分区知识。
2、自定义分区策略
- 实现
org.apache.kafka.clients.producer.Partitioner 接口。 - 代码实现
Properties prop = new Properties();
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"自定义实现的分区器的全路径");
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);三、生产者 acks 应答机制
1、acks = 0
producer不等待broker返回ack信息,只是将消息加入到socket的缓冲中,可能还未发送出去。
- 重试(
retries)不会生效。 - 为每个记录返回的偏移量始终是
-1。 - 可能会存在
丢失数据的情况。
2、acks = 1
producer等待broker返回ack信息,只要这个partition的leader将消息成功保存后返回ack。
- 如果
leader返回ack后,但是还未同步给follower,此时leader宕机了,可能会出现丢失数据的情况。
3、acks = [all | -1]
producer等待broker返回ack信息,partition的leader和follower全部将消息保存成功后,才返回ack信息。
- 如果在
leader和follower将消息都保存完毕,但是在返回ack之前,leader宕机了,此时可能导致 消息重复。 -
此处指的是 所有的follower 都不同完,而不是半数以上同步完,才发送ack。
- 因为,当需要容忍 `n` 台节点故障时,`半数以上` 需要`2n+1`个副本,而`所有follower`都同步完,只需要`n+1`个副本。四、日志文件的 HW 和 LEO
1、HW
HW : 即 High Watermark,指的是该分区(partition)中所有副本(leader+follower)中最小的LEO。
- 上面中所有的副本:指的是 该分区的
ISR集合。 -
HW 之前的消息是可消费的,比如 HW=5,那么可以消费的 offset 是 [0-4] 不包括5.
2、LEO
LEO :即 Log End Offset,即当前日志文件中,下一条消息待写入的 offset。
3、举例说明
比如,一个分区下有3个ISR,一个leader和2个follower。
leader 的leo 是 6
follower01 的leo是 5
follower02 的 leo 是 4
那么该分区的 hw 是 4,即最小的leo。

五、消费者的分区策略
我们知道,我们的消费者(consumer)是隶属于消费者组(consumer group)的,消费者组中可以存在多个消费者,每个消费者可以消费多个主题(topic),每个主题又存在多个分区(partition),每个分区只能由一个消费者来消费,每个消费者又可以消费多个分区。那么就必然涉及到 如何将某个分区分配给那个消费者消费。
1、RangeAssignor - 基于订阅的 topic来分配
默认分区策略 2、RoundRobinAssignor - 基于consumer group来分配,可能出现误消费别的主题的情况
eg:
topicA 存在 0,1,2 三个分区
topicB 也存在 0,1,2三个分区
consumerA 和 consumerB 同属于一个组
consumerA 订阅 topicA
consumerB 订阅 tocpiA 和 topicB
此时按照 RoundRobinAssignor 策略,会先将所有的分区进行排序,则会产生6个主题分区对象(`TopicPartition`),因为topicA和topicB 每个主题共有3个分区。此时就有可能将topicB的消息发送给了cousumerA,导致消费错误。3、StickyAssignor -
4、CooperativeStickyAssignor -
六、消费者offset的维护
1、自动提交offset
Properties prop = new Properties();
// 设置自动提交offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 自动提交offset的间隔,单位ms
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);2、手动提交offset
consumer.commitAsync() : 异步提交
consumer.commitSync() : 同步提交
Properties prop = new Properties();
// 设置手动提交offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList("topicA", "topicB"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println("offset: " + record.offset() + "value:" + record.value());
}
// 异步提交offset
consumer.commitAsync();
// 同步提交offset
// consumer.commitSync();
}3、自定义提交offset
需要实现 org.apache.kafka.clients.consumer.ConsumerRebalanceListener 接口,在消费者发生 rebalance 时,保存或获取自定义的offset。
七、拦截器
1、生产者拦截器
实现org.apache.kafka.clients.producer.ProducerInterceptor接口。可以实现消息发送到kafka broker之前的消息拦截。
1、onSend(ProducerRecord<K, V> record)方法
该方法运行在主线程中,我们可以在该方法中对消费进行任何操作,但是最好不要修改topic和partition,否则可能影响消息目标分区的计算。
2、onAcknowledgement(RecordMetadata metadata, Exception exception)
该方法运行在producer 的 I/O线程中,因此不要执行一些比较耗时的操作,否则会拖慢producer的发送消息的速度。该方法在消息发送kafka broker之后返回ack之后执行或发送到kafka broker的过程中发生异常执行。
需要自己保证实现的拦截器的线程安全问题。
Properties prop = new Properties();
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ProducerInterceptor类的全类名"));
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);2、消费者拦截器
实现org.apache.kafka.clients.consumer.ConsumerInterceptor类,可以对从kafka broker获取到的消息进行拦截。和consumer#poll运行在同一个线程中。
Properties prop = new Properties();
prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ConsumerInterceptor类的全类名"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);八、参考文档
1、http://kafka.apache.org/documentation/#gettingStarted










