0
点赞
收藏
分享

微信扫一扫

kafka的基本概念


kafka的基本概念

  • ​​一、基本概念​​
  • ​​1、broker​​
  • ​​2、producer​​
  • ​​3、consumer​​
  • ​​4、cousumer group​​
  • ​​5、topic​​
  • ​​6、partition​​
  • ​​7、replicas​​
  • ​​二、生产者分区策略​​
  • ​​1、默认分区策略​​
  • ​​2、自定义分区策略​​
  • ​​三、生产者 acks 应答机制​​
  • ​​1、acks = 0​​
  • ​​2、acks = 1​​
  • ​​3、acks = [all | -1]​​
  • ​​四、日志文件的 HW 和 LEO​​
  • ​​1、HW​​
  • ​​2、LEO​​
  • ​​3、举例说明​​
  • ​​五、消费者的分区策略​​
  • ​​六、消费者offset的维护​​
  • ​​1、自动提交offset​​
  • ​​2、手动提交offset​​
  • ​​3、自定义提交offset​​
  • ​​七、拦截器​​
  • ​​1、生产者拦截器​​
  • ​​2、消费者拦截器​​
  • ​​八、参考文档​​

一、基本概念

1、broker

​broker​​​ 指的一个​​kafka​​服务器,一个kafka集群是由多个 kafka broker 组成。

2、producer

​producer​​​ 指的是消息生产者,即发送消息到 ​​kafka broker​​ 的客户端。

3、consumer

​consumer​​​ 指的消息消费者,即从 ​​kafka broker​​ 获取消息的客户端。

4、cousumer group

​consumer group​​​ 指的是消费者组,拥有相同的 ​​group id​​ 的消费者构成一个消费者组。

  1. 消费者组与消费者之间互不影响。
  2. 消费者组内的每个消费者负责消费不同的分区的数据。
  3. ​一个分区只能有同一个分区中的一个消费者消费​​。

5、topic

​topic​​ 指的是主题。生产者生产消息、消费者消费消息,都需要指定一个topic。

6、partition

​partition​​ 指的是分区。

  1. 一个​​topic​​ 可以存在多个分区。
  2. 一个分区,只能被同一个消息者组中的某一个消费者消费。
  3. ​每个分区上的消息都是有序的​​​,但是主题(​​topic​​)上的消息不是有序的。
  4. 每个分区可能存在多个​​follower​​​,其中​​负责读/写​​​的是​​leader​​。
  5. 每个​​kafka broker​​​可以是当前分区的leader,也可以是其它分区的​​follower​​。
  6. 多个分区可以提高程序的并发性,因为一个分区只能一个分区消息,多个分区可以多个消费者同时消费。

7、replicas

​replicas​​ 指的是副本。数据冗余,保证集群的可用性。

  1. ​leader​​​ 指的是​​topic​​主题的分区中,每个分区的 ​​主​​ 。生产者发送数据、消费者消费数据都是从 ​​leader分区​​ 中操作的。
  2. ​follower​​​ 指的是 ​​topic​​主题的分区中,每个分区的 ​​从​​。负责从​​leader分区​​同步数据,当 leader 宕机时,follower 可能成为新的 leader。默认是从 ​​ISR​​ 中进行选举。设置​​unclean.leader.election.enable = true​​可以从非​​ISR​​节点中进行选举,这样可以导致丢失数据,默认值是​​false​​,建议是​​false​​。
  3. ​ISR(in-sync replicas)​
  1. 与leader保持同步的follower集合
  2. 如果follower在超过​​replica.lag.time.max.ms​​​ 毫秒,没有与leader进行同步,则踢出ISR。
    kafka的基本概念_kafka基本概念
  1. ​OSR(out-sync replicas)​
  1. 落后leader太多的副本
  1. ​AR​
  1. ISR + OSR

是否允许非ISR的副本参与选举。
kafka的基本概念_kafka拦截器_02

二、生产者分区策略

即生产者,发送发送消息后,该消息保存到​​kafka broker​​​上的​​topic​​​上的那个​​partition​​上。

1、默认分区策略

默认的分区策略使用的是 ​​org.apache.kafka.clients.producer.internals.DefaultPartitioner​​​ 这个类。
即我们代码中使用​​​org.apache.kafka.clients.producer.ProducerRecord​​发送消息时。

  1. 在指定了​​partition​​ 的情况下,直接使用指定的 分区。
  2. 没有指定​​partition​​​ 但指定了​​key​​​的情况下,将key的hash值与topic的partition
    数进行取余得到partition值。
  3. 没有指​​partition​​​和​​key​​​,在一个批次满的时候会发送到同一个分区,当一个新的批次创建时,会发送到另外的分区中。可以通过​​KIP-480​​ 获取更多的粘性分区知识。

2、自定义分区策略

  1. 实现​​org.apache.kafka.clients.producer.Partitioner​​ 接口。
  2. 代码实现
Properties prop = new Properties();
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"自定义实现的分区器的全路径");
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

三、生产者 acks 应答机制

1、acks = 0

​producer​​​不等待​​broker​​​返回​​ack​​信息,只是将消息加入到socket的缓冲中,可能还未发送出去。

  1. 重试(​​retries​​)不会生效。
  2. 为每个记录返回的偏移量始终是​​-1​​。
  3. 可能会存在​​丢失数据​​的情况。

2、acks = 1

​producer​​​等待​​broker​​​返回​​ack​​​信息,只要这个​​partition​​​的​​leader​​​将消息成功保存后返回​​ack​​。

  1. 如果​​leader​​​返回​​ack​​​后,但是还未同步给​​follower​​​,此时​​leader​​​宕机了,可能会出现​​丢失数据​​的情况。

3、acks = [all | -1]

​producer​​​等待​​broker​​​返回​​ack​​​信息,​​partition​​​的​​leader​​​和​​follower​​​全部将消息保存成功后,才返回​​ack​​信息。

  • 如果在​​leader​​和​​follower​​将消息都保存完毕,但是在返回​​ack​​之前,​​leader​​宕机了,此时可能导致 ​​消息重复​​。
  • ​此处指的是 所有的follower 都不同完,而不是半数以上同步完,才发送ack。​
- 因为,当需要容忍 `n` 台节点故障时,`半数以上` 需要`2n+1`个副本,而`所有follower`都同步完,只需要`n+1`个副本。

四、日志文件的 HW 和 LEO

1、HW

​HW​​​ : 即 High Watermark,指的是该分区(​​partition​​​)中所有副本(leader+follower)中​​最小的LEO​​。

  • 上面中所有的副本:指的是 该分区的​​ISR集合。​
  • ​HW 之前的消息是可消费的​​,比如 HW=5,那么可以消费的 offset 是 [0-4] 不包括5.

2、LEO

​LEO​​​ :即 Log End Offset,即当前日志文件中,​​下一条​​消息待写入的 offset。

3、举例说明

比如,一个分区下有3个ISR,一个leader和2个follower。
leader 的leo 是 6
follower01 的leo是 5
follower02 的 leo 是 4
那么该分区的 hw 是 4,即最小的leo。

kafka的基本概念_kafka分区策略_03

五、消费者的分区策略

我们知道,我们的消费者(​​consumer​​​)是隶属于消费者组(​​consumer group​​​)的,消费者组中可以存在多个消费者,每个消费者可以消费多个主题(​​topic​​​),每个主题又存在多个分区(​​partition​​),每个分区只能由一个消费者来消费,每个消费者又可以消费多个分区。那么就必然涉及到 如何将某个分区分配给那个消费者消费

1、RangeAssignor - 基于订阅的 topic来分配
     ​​​默认分区策略​​​ 2、RoundRobinAssignor - 基于consumer group来分配,​​可能出现误消费别的主题的情况​

eg:
topicA 存在 0,1,2 三个分区
topicB 也存在 0,1,2三个分区
consumerA 和 consumerB 同属于一个组
consumerA 订阅 topicA
consumerB 订阅 tocpiA 和 topicB
此时按照 RoundRobinAssignor 策略,会先将所有的分区进行排序,则会产生6个主题分区对象(`TopicPartition`),因为topicA和topicB 每个主题共有3个分区。此时就有可能将topicB的消息发送给了cousumerA,导致消费错误。

3、StickyAssignor -
4、CooperativeStickyAssignor -

六、消费者offset的维护

1、自动提交offset

Properties prop = new Properties();
// 设置自动提交offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 自动提交offset的间隔,单位ms
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);

2、手动提交offset

​consumer.commitAsync()​​​ : 异步提交
​​​consumer.commitSync()​​ : 同步提交

Properties prop = new Properties();
// 设置手动提交offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList("topicA", "topicB"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println("offset: " + record.offset() + "value:" + record.value());
}
// 异步提交offset
consumer.commitAsync();
// 同步提交offset
// consumer.commitSync();
}

3、自定义提交offset

需要实现 ​​org.apache.kafka.clients.consumer.ConsumerRebalanceListener​​​ 接口,在消费者发生 ​​rebalance​​​ 时,保存或获取自定义的offset。
kafka的基本概念_kafka_04

七、拦截器

1、生产者拦截器

实现​​org.apache.kafka.clients.producer.ProducerInterceptor​​​接口。可以实现消息发送到​​kafka broker​​之前的消息拦截。

1、onSend(ProducerRecord<K, V> record)方法
该方法运行在主线程中,我们可以在该方法中对消费进行任何操作,但是最好​​​不要修改topic​​​和​​partition​​,否则可能影响消息目标分区的计算。

2、onAcknowledgement(RecordMetadata metadata, Exception exception)
该方法运行在​​​producer 的 I/O​​线程中,因此不要执行一些比较耗时的操作,否则会拖慢producer的发送消息的速度。该方法在消息发送kafka broker之后返回ack之后执行或发送到kafka broker的过程中发生异常执行。

​需要自己保证实现的拦截器的线程安全问题。​

Properties prop = new Properties();
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ProducerInterceptor类的全类名"));
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

2、消费者拦截器

实现​​org.apache.kafka.clients.consumer.ConsumerInterceptor​​​类,可以对从​​kafka broker​​​获取到的消息进行拦截。和​​consumer#poll​​运行在同一个线程中。

Properties prop = new Properties();
prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ConsumerInterceptor类的全类名"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);

八、参考文档

​​1、http://kafka.apache.org/documentation/#gettingStarted​​


举报

相关推荐

0 条评论