0
点赞
收藏
分享

微信扫一扫

【檀越剑指大厂—kafka】kafka高阶篇


一.认识 kafka

1.kafka 的定义?

Kafka 传统定义:Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

Kafka 最新定义: Kafka 是一个开源的分布式事件流平台( Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

2.kafka 应用场景?

  • 消息系统
  • 储存系统
  • 流式处理平台

消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。

存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。

流式处理平台: Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。

3.为什么不使用 Flume?

Kafka 更侧重于数据的存储以及流数据的实时处理,是一个追求高吞吐量、高负载的消息队列。

Flume 则是侧重于数据的采集和传输,提供了很多种接口支持多种数据源的采集,但是 Flume 并不直接提供数据的持久化。
就吞吐量和稳定性来说,Flume 不如 Kafka。所以在使用场景上,如果你需要在两个数据生产和消费速度不同的系统之间传输数据,比如实时产生的数据生产速度会经常发生变化,时段不同会有不同的峰值,如果直接写入 HDFS 可能会发生拥堵,在这种过程中加入 Kafka,就可以先把数据写入 Kafka,再用 Kafka 传输给下游。而对于 Flume 则是提供了更多封装好的组件,也更加轻量级,最常用于日志的采集,省去了很多自己编写代码的工作。

二.生产者

1.发送流程

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。

Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。

  • batch.size: 只有数据积累到 batch.size 之后,sender 才会发送数据。默认 16k
  • linger.ms: 如果数据迟迟未达到 batch.size,sender 等待 linger.ms 设置的时间到了之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。

【檀越剑指大厂—kafka】kafka高阶篇_kafka

应答 acks:

  • 0:生产者发送过来的数据,不需要等数据落盘应答。
  • 1:生产者发送过来的数据,Leader 收到数据后应答。
  • -1 (all) :生产者发送过来的数据,Leader 和 ISR 队列里面的所有节点收齐数据后应答。-1 和 all 等价

2.生产者如何提高吞吐量?

  • batch.size: 批次大小,默认 16k,可以修改为 32k
  • linger.ms: 等待时间,默认为 0,可以修改为 5-100ms
  • compression.type: 默认值为 producer,可以修改为 snappy
  • RecordAccumulator:缓冲区大小,默认 32M,修改为 64M

3.什么是 ISR?

分区中的所有副本统称为 AR (Assigned Replicas)所有与 leader 副本保持一定程度同步的副本( 包括 leader 副本在内)组成 ISR.滞后的为 OSR.

Leader 收到数据,所有 Follower 都开始同步数据,但有一个 Follower,因为某种故障,迟迟不能与 Leader 进行同步,那这个问题怎么解决呢?
Leader 维护了一个动态的 in-sync replica set (ISR) ,意为和 Leader 保持同步的 Follower+I eader 集合(leader: 0, isr:0,1,2)。如果 Follower 长时间未向 Ieader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。例如 2 超时,(leader:0, isr:0,1)。这样就不用等长期联系不上或者已经故障的节点。

数据完全可靠条件= ACK 级别设置为-1+ 分区副本大于等于 2+ ISR 里应答的最小副本数量大于等于 2

可靠性总结:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据 Leader 应答,可靠性中等,效率中等;默认为 1
acks=-1,生产者发送过来数据 Leader 和 ISR 队列里面所有 Follwer 应答,可靠性高,效率低;
在生产环境中,acks=0 很少使用; acks=1, 一般用于传输普通日志,允许丢个别数据; acks=-1, 一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

4.如何解决数据重复?

  • 至少一次(At Least Once) = ACK 级别设置为-1 +分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2
  • 最多一次(At Most Once) = ACK 级别设置为 0

总结:

  • At Least Once 可以保证数据不丢失,但是不能保证数据不重复;
  • At Most Once 可以保证数据不重复,但是不能保证数据不丢失。

精确一次(Exactly Once) :对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

kafka 在 0.11 版本引入了幂等性和事务.

幂等性就是指 Producer 不论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条,保证了不重复。

精确一次(Exactly Once) =幂等性+至少-次( ack=-1 +分区副本数>=2 + ISR 最小副本数量>=2)。

重复数据的判断标准:具有<PID, Partition, SeqNumber> 相同主键的消息提交时,Broker 只会持久化一条。其
中 PID 是 Kafka 每次重启都会分配一个新的; Partition 表示分区号; Sequence Number 是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复

  • pid
  • 分区号
  • 序列号

如何使用幂等性,开启参数 enable.idempotence 默认为 true,默认打开状态, false 关闭。

5.kafka 事务原理?

开启事务,必须开启幂等性.

  • Producer 在使用事务功能前,必须先自定义一个唯一的 transactional.id。 有了 transactional.id, 即使客户端挂掉了,它重启后也能继续处理未完成的事务
  • 默认有 50 个分区,每个分区负责一部分事务。事务划分是根据 transactional.id 的 hashcode 值%50,计算出该事务属于哪个分区。该分区 Leader 副本所在的 broker 节点即为这个 transactional.id 对应的 Transaction Coordinator 节点。

【檀越剑指大厂—kafka】kafka高阶篇_数据_02

事务的执行过程

  • 初始化
  • 启动
  • 消费 offset
  • 提交
  • 终止

6.如何保证数据有序?

  • 单分区:有序(有条件)
  • 多分区:无序,可以在消费端进行排序,需要统筹下资源后进行排序.

kafka 有序的原理

  • kafka 在 1.x 版本之前保证数据单分区有序,条件如下:
  • max.in.flight.requests.per.connection=1 (不需要考虑是否开启幂等性)。
  • kafka 在 1.x 及以后版本保证数据单分区有序,条件如下:
  • (1)未开启幂等性
  • max.in.flight.requests.per.connection 需要设置为 1。
  • (2)开启幂等性
  • max.in.flight.requests.per.connection 需要设置小于等于 5。
  • 原因说明:因为在 kafka1.x 以后,启用幂等后,kafka 服务端会缓存 producer 发来的最近 5 个 request 的元数据,
  • 故无论如何,都可以保证最近 5 个 request 的数据都是有序的。

7.生产者异步代码

  1. 配置参数
  • 连接 bootstrap-server
  • key value 序列化
  1. 创建生产者
  • KafkaProducer<string, String> ()
  1. 发送数据
  • send () send (, new Callback)
  1. 关闭资源

public static void main(String[] args) {
  //1.配置文件
  Properties properties = new Properties();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class);
  //2.创建生产者
  KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  //3.发送数据
  for (int i = 0; i < 5; i++) {
    kafkaProducer.send(new ProducerRecord<>("first", "hello" + i)); //主题+value
  }
  //4.关闭资源
  kafkaProducer.close();
}

8.生产者同步发送

public static void main(String[] args) throws ExecutionException, InterruptedException {
  //1.配置文件
  Properties properties = new Properties();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class);
  //2.创建生产者
  KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  //3.发送数据
  for (int i = 0; i < 5; i++) {
    kafkaProducer.send(new ProducerRecord<>("first", "hello" + i)).get(); //主题+value
  }
  //4.关闭资源
  kafkaProducer.close();
}

9.自定义分区

表名作为 key 发到一个分区

public class BizPartitioner implements Partitioner {

  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    return 1;
  }

  @Override
  public void close() {

  }

  @Override
  public void configure(Map<String, ?> configs) {

  }
}

public class BizPartitionerTest {

  public static void main(String[] args) {
    Map<String, Object> configs = Maps.newHashMap();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 添加分区对应的业务分区,自定义实现类
    configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, BizPartitioner.class);
    // 创建生产者实例
    Producer producer = new KafkaProducer(configs);
    try {
      for (int i = 0; i < 10; i++) {
        // 创建消息
        ProducerRecord record = new ProducerRecord("Hello-Kafka", "hello kitty " + i + "!");
        // 发送并获取 发送结果
        RecordMetadata metadata = (RecordMetadata) producer.send(record).get();
        System.out.println(String.format("topic=%s, msg=%s, partition=%s, offset=%s, timestamp=%s",
                                         metadata.topic(), record.value(),
                                         metadata.partition(), metadata.offset(), metadata.timestamp()));
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    producer.close();
  }
}

10.数据分区原则

注意:如果 key 不为 null, 那么计算得到的分区号会是所有分区中的任意一个;如果 key 为 null, 那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别。

消息被顺序追加到每个分区日志文件的尾部。Kafka 中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个 broker,以此来提供比单个 broker 更强大的性能。

ProducerRecord 的构造方法较多,也是处理业务需求的关键

(1)指明 partition 的情况下,直接将指明的值作为 partition 值;例如 partition=0,所有数据写入分区 0

(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
例如: key1 的 hash 值=5,key2 的 hash 值=6 ,topic 的 partition 数=2,那么 key1 对应的 value1 写入 1 号分区,key2 对 应的 value2 写入 0 号分区。

(3)既没有 partition 值又没有 key 值的情况下,Kafka 采用 Sticky Partition ( 黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(和上一次的分区不同)。
例如:第一次随机选择 0 号分区, 等 0 号分区当前批次满了(默认 16k) 或者 linger.ms 设置的时间到,Kafka 再随机一个分区进行使用(如果还是 0 会继续随机)。

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
  this(topic, partition, timestamp, key, value, (Iterable)null);
}

public ProducerRecord(String topic, Integer partition, K key, V value) {
  this(topic, partition, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, K key, V value) {
  this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, V value) {
  this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
}

11.生产者参数

  • acks 默认 1
  • max.request.size 默认 1M
  • retries 和 retry.backoff.ms 默认值 0,默认值 100
  • compression.type 默认 none
  • connections.max.idle.ms 默认 9 分钟
  • linger.ms 默认值为 0
  • receive.buffer.bytes 默认值 32kb
  • request.timeout.ms 默认值 30000ms

三.消费者

1.kafka 消费方式

  • pull (拉)模式:consumer 采用从 broker 中主动拉取数据。
  • push(推)模式:broker 推送.

Kafka 采用拉模式。 Katka 没有采用推模式,因为由 broker 决定消息发送速率,很难适应所有消费者的消费速率。例如:推送的速度是 50m/s,Consumer 1、Consumer2 就来不及处理消息。pull 模式不足之处是,如果 Kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。

2.消息队列的两种模式?

1.点对点模式

  • 一个生产者,一个消费者,一个 topic
  • 消费者主动拉取数据,消息收到后清除消息
    2.发布/订阅模式
  • 可以有多个 topic 主题(浏览、点赞、收藏、评论等)
  • 消费者消费数据之后,不删除数据
  • 每个消费者相互独立,都可以消费到数据

如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

3.kafka 消费者总体流程

Consumer Group (CG):消费者组,由多个 consumer 组成。形成一个消费者组的条件,是所有消费者的 groupid 相同

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

【檀越剑指大厂—kafka】kafka高阶篇_幂等性_03

4.消费者组的初始化

coordinator:辅助实现消费者组的初始化和分区的分配。
coordinator 节点选择二 groupid 的 hashcode 值 %50(consumer offsets 的分区数量)
例如:groupid 的 hashcode 值= 1, 1%50=1,那么 consumer_ offsets 主题的 1 号分区,在哪个 broker 上,就选择这个节点的 coordinator 作为这个消费者组的老大。消费者组下的所有的消费者提交 otfset 的时候就往这个分区去提交 offset.

【檀越剑指大厂—kafka】kafka高阶篇_数据_04

5.消费者组的消费流程

[外链图片转存中…(img-Ugf0UqH8-1668135785067)]

6.常见消费场景

  • 普通消费
  • 多线程消费
  • 指定分区消费
  • 正则订阅
  • 自定义反序列化,Deserializer
  • 设置分区分配策略
  • offset 自动提交,手动提交
  • 指定 offset 进行消费
  • 指定时间进行消费

#多线程消费KafkaConsumerProcess实现Runnable
new Thread(new KafkaConsumerProcess(properties, "Hello-Kafka"), "thread-one").start();

#分配对应的分区
consumer.assign(ImmutableList.of(new TopicPartition("Hello-Kafka", 0)));

#正则订阅主题
consumer.subscribe(Pattern.compile("Hello-.*"));

#自定义反序列化
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());

public class CompanyDeserializer implements Deserializer<Company>
@Override
public Company deserialize(String topic, byte[] data) {}


#设置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());

7.消费者时间参数

session.timeout.ms:group coordinator 检测 consumer 发生崩溃所需的时间。一个 consumer group 里面的某个 consumer 挂掉了,最长需要 session.timeout.ms 秒检测出来。它指定了一个阈值—45 秒,在这个阈值内如果 group coordinator 未收到 consumer 的任何消息(指心跳),那 coordinator 就认为 consumer 挂了。

heartbeat.interval.ms:默认 3 秒,每个 consumer 都会根据 heartbeat.interval.ms 参数指定的时间周期性地向 group coordinator 发送 hearbeat,group coordinator 会给各个 consumer 响应,若发生了 rebalance,各个 consumer 收到的响应中会包含 REBALANCE_IN_PROGRESS 标识,这样各个 consumer 就知道已经发生了 rebalance,同时 group coordinator 也知道了各个 consumer 的存活情况。
max.poll.interval.ms:如果 consumer 两次 poll 操作间隔超过了这个时间,broker 就会认为这个 consumer 处理能力太弱,会将其踢出消费组,将分区分配给别的 consumer 消费 ,触发 rebalance 。默认 5 分钟.

总结:new KafkaConsumer 对象后,在 while true 循环中执行 consumer.poll 操作拉取消息中,会有两个线程执行:一个是 heartbeat 线程,另一个是 processing 线程。

  • processing 线程可理解为调用 consumer.poll 方法执行消息处理逻辑的线程,
  • 而 heartbeat 线程是一个后台线程,对程序员是"隐藏不见"的。heartbeat 线程每隔 heartbeat.interval.ms 向 coordinator 发送一个心跳包,证明自己还活着。但是如果 group coordinator 在一个 heartbeat.interval.ms 周期内未收到 consumer 的心跳,就把该 consumer 移出 group,这有点说不过去。事实上,有可能网络延时,有可能 consumer 出现了一次长时间 GC,影响了心跳包的到达,说不定下一个 heartbeat 就正常了。而如果 group coordinator 在 session.timeout.ms 内未收到 consumer 的心跳,那就会把该 consumer 移出 group。
  • 而 max.poll.interval.ms 参数,在 consumer 两次 poll 操作间隔超过了这个时间,即 consumer 的消息处理逻辑时长超过了 max.poll.interval.ms,该消费者就会被提出消费者组。

max.poll.interval.ms 设置不合理,会导致不停地 rebalance,一般会做一些失败的重试处理。比如通过线程池的 ThreadPoolExecutor#afterExecute()方法捕获到异常,再次提交 Runnable 任务重新订阅 kafka topic。本来消费处理需要很长的时间,如果某个 consumer 处理超时:消息处理逻辑的时长大于 max.poll.interval.ms (或者消息处理过程中发生了异常),被 coordinator 移出了 consumer 组,这时由于失败的重试处理,自动从线程池中拿出一个新线程作为消费者去订阅 topic,那么意味着有新消费者加入 group,就会引发 rebalance,而可悲的是:新的消费者还是来不及处理完所有消息,又被移出 group。如此循环,就发生了不停地 rebalance 的现象。

8.分区分配策略

消费组中的 consumer 是如何确定自己该消费哪些分区的数据的?

Kafka 提供了多种分区策略如 RoundRobin(轮询)、Range(按范围),Sticky,CooperativeSticky

可通过参数 partition.assignment.strategy 进行配置,默认是 Range+CooperativeSticky

9.分区重分配的条件

  • Consumer Group 中的 consumer 发生了新增或者减少
  • Consumer Group 订阅的 topic 分区发生变化如新增分区

10.Range 策略

Range 策略是针对 topic 而言的,在进行分区分配时,为了尽可能保证所有 consumer 均匀的消费分区,会对同一个 topic 中的 partition 按照序号排序,并对 consumer 按照字典顺序排序。

然后为每个 consumer 划分固定的分区范围,如果不够平均分配,那么排序靠前的消费者会被多分配分区。具体就是将 partition 的个数除于 consumer 线程数来决定每个 consumer 线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多分配分区。

#当存在有2个Kafka topic(t1和t2),它们都有有10个partition,那么最后分区结果为:
C0-0 将消费t1主题的0、1、2、3分区以及t2主题的0、1、2、3分区
C1-0 将消费t1主题的4、5、6分区以及t2主题的4、5、6分区
C2-1 将消费t1主题的7、8、9分区以及t2主题的7、8、9分区

range 策略的问题,topic 多了后,消费者分配不均,会出现部分消费者过载.数据倾斜.

Range 策略,某一个消费者挂了后,会全部分配到其他某一个分区,再次发送,会重新分配

11.RoundRobin 策略

RoundRobin 策略的工作原理:将所有 topic 的 partition 组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,最后按照 RoundRobin 风格将分区分别分配给不同的消费者。

使用 RoundRobin 策略必须满足以下条件:

  • 同一个 Consumer Group 里面的所有 consumer 的 num.streams 必须相等
  • 每个 consumer 订阅的 topic 必须相同

假设消费组 CG1 中有 C0 和 C1 两个 consumer 的 num.streams 都为 2。按照 hashCode 排序完的 topic-partition 组依次为 t1-5, t1-3, t1-0, t1-8, t1-2, t1-1, t1-4, t1-7, t1-6, t1-9,我们的消费者排序为 C0-0, C0-1, C1-0, C1-1,最后分区分配的结果为:

C0-0将消费t1-5、t1-2、t1-6分区
C0-1将消费t1-3、t1-1、t1-9分区
C1-0将消费t1-0、t1-4分区
C1-1将消费t1-8、t1-7分区

12.Sticky 分配策略

粘性策略它主要有两个目的:

  • 分区的分配要尽可能的均匀
  • 分区的分配尽可能的与上次分配的保持相同

当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor 策略的具体实现要比 RangeAssignor 和 RoundRobinAssignor 这两种分配策略要复杂很多。

13.offset 的自动提交

为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。
自动提交 offset 的相关参数:

  • enable.auto.commit: 是否开启自动提交 offset 功能,默认是 true
  • auto.commit.interval.ms:自动提交 offset 的时间间隔,默认是 5s

14.offset 的手动提交

对于采用 commitSync () 的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用 commitSync () 的另一个含参方法,具体定义如下:

指定 offset 进行提交

public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {}

15.指定 offset

auto.offset.reset = earliest | latest | none  默认是latest

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

  1. earliest: 自动将偏移量重置为最早的偏移量,-- fromn-beginning。
  2. latest (默认值):自动将偏移量重置为最新偏移量。
  3. none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

seek()方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法来向前跳过若干消息,也可以通过这个方法来向后回溯若干消息,这样为消息的消费提供了很大的灵活性。seek ()方法也为我们提供了将消费位移保存在外部存储介质中的能力,还可以配合再均衡监听器来提供更加精准的消费能力。

public void seek(TopicPartition partition, long offset) {}

16.漏消费和重复消费

  • 重复消费:已经消费了数据,但是 offset 没提交。
  • 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

重发消费。自动提交 otset 引起

漏消费。设置 offset 为手动提交,当 offset 被提交吋,数据还在内存中末落盘,此时刚好消费者线程被 kil 掉,那么 offset 己经提交,但是数据未处理,导致这部分内存中的数据丢失。

[外链图片转存中…(img-fSCBzHNh-1668135785068)]

17.消费者事务

如果想完成 Consumer 端的精准一次性消 费,那么需要 Kafka 消费端将消费过程和提交 offset 过程做原子绑定。此时我们需要将 Kafka 的 offset 保存到支持事务的自定义介质(比如 MySQL)。

下游消费者必须支持事务,才能做到精确一次消费.

18.如何提高消费者吞吐量?

  • 如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
  • 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),
    使处理的数据小于生产的数据,也会造成数据积压。

19.消费者参数

  • fetch.min.bytes 拉取的最小数据量 默认 1b
  • fetch.max.bytes 最大数据量 默认 50M
  • fetch.max.wait.ms 最大等待时间 默认 500ms
  • max.partition.fetch.bytes 每个分区返回的最大数据量,默认为 1M
  • max.poll.records 最大数据量 默认为 500 条
  • connections.max.idle.ms 多久之后关闭限制的连接 默认 9 分钟
  • receive.buffer.bytes 接收缓冲区的大小 默认为 64kb
  • receive.buffer.bytes 接收缓冲区的大小 默认为 128kb

20.消费者协调器

了解了 Kafka 中消费者的分区分配策略之后是否会有这样的疑问:如果消费者客户端中配置了两个分配策略,那么以哪个为准呢?如果有多个消费者,彼此所配置的分配策略并不完全相同,那么以哪个为准?多个消费者之间的分区分配是需要协同的,那么这个协同的过程又是怎样的呢?这一切都是交由消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator) 来完成的,它们之间使用一套组协调协议进行交互。

旧版:使用 zookeeper 监听器来完成的.每个消费者在启动时都会在/consumers/group/ids 和/brokers/ids 路径上注册一个监听器。当/consumers/ group/ids 路径下的子节点发生变化时,表示消费组中的消费者发生了变化;当/brokers/ ids 路径下的子节点发生变化时,表示 broker 出现了增减。这样通过 ZooKeeper 所提供的 Watcher,每个消费者就可以监听消费组和 Kafka 集群的状态了。

这种方式下每个消费者对 ZooKeeper 的相关路径分别进行监听,当触发再均衡操作时,一个消费组下的所有消费者会同时进行再均衡操作,而消费者之间并不知道彼此操作的结果,这样可能导致 Kafka 工作在一个不正确的状态。与此同时,这种严重依赖于 ZooKeeper 集群的做法还有两个比较严重的问题。

  • 羊群效应(Herd Effect) :所谓的羊群效应是指 ZooKeeper 中一个被监听的节点变化,大量的 Watcher 通知被发送到客户端,导致在通知期间的其他操作延迟,也有可能发生类似死锁的情况。
  • 脑裂问题(Split Brain) :消费者进行再均衡操作时每个消费者都与 ZooKeeper 进行通信以判断消费者或 broker 变化的情况,由于 ZooKeeper 本身的特性,可能导致在同一时刻各个消费者获取的状态不一致,这样会导致异常问题发生。

新版:新版的消费者客户端对此进行了重新设计,将全部消费组分成多个子集,每个消费组的子集在服务端对应一个 GroupCoordinator 对其进行管理,GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。而消费者客户端中的 ConsumerCoordinator 组件负责与 GroupCoordinator 进行交互。ConsumerCoordinator 与 GroupCoordinator 之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在再均衡期间完成的。就目前而言

21._consumer_offsets 剖析

_consumer_offsets是kafka默认创建的,一共50个分区.

一般情况下,当集群中第一次有消费者消费消息时会自动创建__consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为50。

客户端提交消费位移是使用 OffsetCommitRequest 请求实现的,OffsetCommitRequest 的结构如图所示。

retention_time 表示当前提交的消费位移所能保留的时长,不过对于消费者而言这个值保持为-1。也就是说,按照
broker 端的配置 offsets.retention.minutes 来确定保留时长。offsets.retention.minutes 的默认值为 10080,即 7 天,超过这个时间的消息会被删除.

【檀越剑指大厂—kafka】kafka高阶篇_kafka_05

同消费组的元数据信息一样,最终提交的消费位移也会以消息的形式发送至主题 consumer_ offsets, 与消费位移对应的消息也只定义了 key 和 value 字段的具体内容,它不依赖于具体版本的消息格式,以此做到与具体的消息格式无关。

【檀越剑指大厂—kafka】kafka高阶篇_kafka_06

key 中除了 version 字段还有 group、topic、partition 字段,分别表示消费组的 groupld、主题名称和分区编号。虽然 key 中包含了 4 个字段,但最终确定这条消息所要存储的分区还是根据单独的 group 字段来计算的,这样就可以保证消费位移信息与消费组对应的 GroupCoordinator 处于同一个 broker 节点上,省去了中间轮转的开销,这一点与消费组的元数据信息的存储是一样的。

value 中包含了 5 个字段,除 version 字段外,其余的 offset、metadata、commit_ timestamp、 expire timestamp 字段分别表示消费位移、自定义的元数据信息、位移提交到 Kafka 的时间戳、消费位移被判定为超时的时间戳。其中 offset 和 metadata 与 OffsetCommitRequest 请求体中的 offset 和 metadata 对应,而 expire .timestamp 和 OffsetCommitRequest 请求体中的 retention* time 也有关联,commit* timestamp 值与 offsets.retention.minutes 参数值之和即为 expire_ _timestamp (默认情况下)。

在处理完消费位移之后,Kafka 返回 OffsetCommitResponse 给客户端,OffsetCommitResponse 的结构如图所示。

【檀越剑指大厂—kafka】kafka高阶篇_分布式_07

冷门知识:如果有若干消费者消费了某个主题中的消息,并且也提交了相应的消费位移,那么在删除这个主题之后会一并将这些消费位移信息删除。

四.broker 服务

1.基础架构

一一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer ,以及一个 ZooKeeper 集群。其中
ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。Producer 将消息发送到 Broker, Broker 负责将收到的消息存储到磁盘中,而 Consumer 负 责从 Broker 订阅并消费消息。

Server.config配置3个参数,
brokerid:唯一
日志:地址
zk:地址

【檀越剑指大厂—kafka】kafka高阶篇_kafka_08

2.高效读写

  • 集群 采用分区
  • 稀疏索引
  • 顺序读写
  • 零拷贝与页缓存

3.broker 的工作流程

Broker: 服务代理节点。对于 Kafka 而言,Broker 可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例。一个或多个 Broker 组成了一个 Kafka 集群。一 般而言,我们更习惯使用首字母小写的 broker 来表示服务代理节点。

  • broker
  • zookeeper
  • controller
  • isr
  • leader

[外链图片转存中…(img-2xzwac9B-1668135785068)]

4.存储机制

  • broker
  • topic
  • partition
  • log
  • segment
  • 稀疏索引

5.zk 中存储了哪些信息?

节点

内容

说明

/kafka/brokers/ids

[0,1,2]

记录有哪些服务器

/kafka/brokers/topics/first/partitions/0/state

{“leader”:1, “isr”:[1,0,2]}

记录谁是 Leader,有哪些服务器可用

/kafka/controller

{“brokerid”:0}

辅助选举 Leader

【檀越剑指大厂—kafka】kafka高阶篇_分布式_09

  • admin: 存储管理员接口操作的相关信息,主要为 topic 删除事件,分区迁移事件,优先副本选举,信息 (一般为临时节点)
  • brokers: 主要存储 broker 相关的信息,broker 节点以及节点上的 topic 相关信息
  • cluster: 存储 kafka 集群信息
  • config: 存储 broker,client,topic,user 以及 changer 相关的配置信息
  • consumers: 存放消费者相关信息 (一般为空)
  • controller: 用于存放控制节点信息 (注意:该节点是一个临时节点,用于 controller 节点注册)
  • controller_epoch: 用于存放 controller 节点当前的年龄
  • isr_change_notification: 用于存储 isr 的变更通知 (临时节点,当有 isr 进行变动时,会用于事件通知,可进行 watch 获取集群 isr 状态变更)
  • latest_producer_id_block: 该节点用于存储处理事务相关的 pid 范围
  • log_dir_event_notification: 日志目录事件通知

6.offset 存储位置?

  • 0.9 版本之前保存在 zookeeper 下的 consumers 中
  • 0.9 版本之后 offset 存储在 kafka 主题中,该 topic 为_consumer_offsets,不在 zookeeper 中

consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。

consumer_offsets 的每条消息格式大致如图所示:

【檀越剑指大厂—kafka】kafka高阶篇_kafka_10

可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值。

考虑到一个 kafka 生成环境中可能有很多 consumer 和 consumer group,如果这些 consumer 同时提交位移,则必将加重 **consumer_offsets 的写入负载,因此 kafka 默认为该 topic 创建了 50 个分区,并且对每个 group.id 做哈希求模运算,从而将负载分散到不同的 **consumer_offsets 分区上。

一般情况下,当集群中第一次有消费者消费消息时会自动创建 __consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为 3(注意:该参数的使用限制在 0.11.0.0 版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为 50。

7.leader 选举的过程

kafka 中的控制器选举工作依赖于 Zookeeper,成功竞选成为控制器的 broker 会在Zookeeper中创建/controller临时节点。

每个 broker 启动的时候会去尝试读取/controller 节点的 brokerid的值

  • 如果读取到的 brokerid 的值不为-1,表示已经有其他broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;

如果Zookeeper中不存在/controller 节点,或者这个节点的数据异常,那么就会尝试去创建/controller 节点,创建成功的那个 broker 就会成为控制器。

每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activeControllerId。

Zookeeper 中还有一个与控制器有关的/controller_epoch 节点,这个节点是持久节点,节点中存放的是一个整型的 controller_epoch 值。controller_epoch 值用于记录控制器发生变更的次数。

controller_epoch 的初始值为1,即集群中的第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。

每个和控制器交互的请求都会携带 controller_epoch 这个字段,

  • 如果请求的 controller_epoch 值小于内存中的 controller_epoch值,则认为这个请求是向已经过期的控制器发送的请求,那么这个请求会被认定为无效的请求。
  • 如果请求的 controller_epoch 值大于内存中的 controller_epoch值,那么说明已经有新的控制器当选了

【檀越剑指大厂—kafka】kafka高阶篇_java_11

【檀越剑指大厂—kafka】kafka高阶篇_kafka_12

【檀越剑指大厂—kafka】kafka高阶篇_java_13

选取原则:先看 isr 中有没有,如果没有,直接 pass,如果有,继续看副本中的顺序,以副本中的顺序进行选取.

8.什么是 LEO 和 HW?

LEO (Log End offset):每个副本的最后一个 offset, LEO 其实就是最新的 offset + 1
HW (High Watermark):所有副本中最小的 LEO,消费者只能消费 HW 之前的消息.
HW/LEO这两个都是指最后一条的下一条的位置而不是指最后一条的位置。
LSO:Last Stable Offset 对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同
LW:Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值

对于副本而言,还有两个概念:本地副本(Local Replica) 和远程副本(Remote Replica) ,本地副本是指对应的 Log 分配在当前的 broker 节点上,远程副本是指对应的 Log 分配在其他的 broker 节点上。在 Kafka 中,同一个分区的信息会存在多个 broker 节点上,并被其上的副本管理器所管理,这样在逻辑层面每个 broker 节点上的分区就有了多个副本,但是只有本地副本才有对应的日志。

如图所示,某个分区有 3 个副本分别位于 broker0、broker1 和 broker2 节点中,其中带阴影的方框表示本地副本。假设 broker0 上的副本 1 为当前分区的 leader 副本,那么副本 2 和副本 3 就是 follower 副本,整个消息追加的过程可以概括如下:

  1. 生产者客户端发送消息至 leader 副本(副本 1) 中。
  2. 消息被追加到 leader 副本的本地日志,并且会更新日志的偏移量。
  3. follower 副本(副本 2 和副本 3) 向 leader 副本 请求同步数据。
  4. leader 副本所在的服务器读取本地日志,并更新对应拉取的 follower 副本的信息。
  5. leader 副本所在的服务器将拉取结果返回给 follower 副本。
  6. follower 副本收到 leader 副本 返回的拉取结果,将消息追加到本地日志中,并更新日志的偏移量信息。

【檀越剑指大厂—kafka】kafka高阶篇_kafka_14

9.LEO 和 HW 更新逻辑?

  1. leader 中写入消息,leo 为 5,hw 为 0
  2. follower 请求 leader,带上自己的 leo 值
  3. leader 更新自己的 hw 值 min(自己的 hw,副本的 leo)
  4. leader 发送消息给 follower,带上 hw 值
  5. follower 接收消息,leo 接收的不同,但是 hw 和 leader 保持一致
  6. 重复上述操作

【檀越剑指大厂—kafka】kafka高阶篇_分布式_15

在一个分区中,leader 副本所在 的节点会记录所有副本的 LEO,而 follower 副本所在的节点只会记录自身的 LEO,而不会记录其他副本的 LEO。对 HW 而言,各个副本所在的节点都只记录它自身的 HW。leader 副本中带有其他 follower 副本的 LEO,那么它们是什么时候更新的呢?

leader 副本收到 follower 副本的 FetchRequest 请求之后,它首先会从自己的日志文件中读取数据,然后在返回给
follower 副本数据前先更新 follower 副本的 LEO。

10.数据丢失

  • a 是 follower,b 是 leader,当 a 的 hw=1,b 的 hw=2 时,a 宕机,则 a 的数据 m2 丢失
  • b 宕机,a 变为 leader,hw=1,b 恢复为 follower,hw 被截断为 hw=1,数据 m2 丢失

【檀越剑指大厂—kafka】kafka高阶篇_数据_16

11.数据不一致

  • a 为 leader,b 为 follower,a 的 hw 和 leo 都为 2,b 的 hw 和 leo 都为 1,此时,a 和 b 同时挂掉,b 先恢复,且写入了一条数据 m3,更新 hw 和 leo 的值为 2
  • b 恢复,变为 follower,此时 a 和 b 的 hw 都为 2,不做任何调整,但是数据不一致,一个是 m2 一个是 m3

【檀越剑指大厂—kafka】kafka高阶篇_java_17

12.leader epoch

为了解决上述两种问题,Kafka 从 0.11.0.0 开始引入了 leader epoch 的概念,在需要截断数据的时候使用 leader epoch 作为 参考依据而不是原本的 HW。leader epoch 代表 leader 的纪元信息(epoch) ,初始值为 0。每当 leader 变更一次,leader epoch 的值就会加 1, 相当于为 leader 增设了一个版本号。与此同时,每个副本中还会增设一个矢量 LeaderEpoch=StartOffset,其中 StartOffset 表示当前 LeaderEpoch 下写入的第一条消息的偏移量。每个副本的 Log 下都有一个 leader-epoch-checkpoint 文件,在发生 leader epoch 变更时,会将对应的矢量对追加到这个文件中。

  • 数据丢失:请求 le 的值,如果相同,不做任何处理,如果不同,则返回 leader 的 leo 值,和 follower 的 leo 比较,如果相同则不做处理
  • 数据不一致:如果 le 的值变了,则截断数据,保持 leo 和 hw 同步

【檀越剑指大厂—kafka】kafka高阶篇_分布式_18

13.follower 故障处理

Follower 故障

  1. Follower 发生故障后会被临时踢出 ISR
  2. 这个期间 Leader 和 Follower 继续接收数据
  3. 待该 Follower 恢复后,Follower 会读取本地磁盘记录的上次的 HW,并将 log 文 件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步。

【檀越剑指大厂—kafka】kafka高阶篇_java_19

14.leader 故障处理

  1. Leader 发生故障之后,会从 ISR 中选出一个新的 Leader
  2. 为保证多个副本之间的数据一致性, 其余的 Follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 Leader 同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

【檀越剑指大厂—kafka】kafka高阶篇_数据_20

15.leader partition 自平衡

正常情况下,Kafka 本 身会自动把 Leader Partition 均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些 broker 宕机,会导致 Leader Partition 过于集中在其他少部分几台 broker 上,这会导致少数几台 broker 的读写请求压力过高,其他宕机的 broker 重启之后都是 follower partition,读写请求很低,造成集群负载不均衡。

  • auto.leader.rebalance.enable,默认是 true。自动 Leader Partition 平衡
  • leader.imbalance.per.broker.percentage,默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
  • leader.imbalance.check. interval.seconds,默认值 300 秒。检查 leader 负载是否平衡的间隔时间。

假设集群只有一个主题如下图所示

【檀越剑指大厂—kafka】kafka高阶篇_分布式_21

针对 broker 0 节点,分区 2 的 AR 优先副本是 0 节点,但是 0 节点却不是 Ieader 节点,所以不平衡数加 1,AR 副本总数是 4,所以 broker 0 节点不平衡率为 1/4>10%,需要再平衡。

16.复制限流

分区重分配本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终的目的。
数据复制会占用额外的资源,如果重分配的量太大必然会严重影响整体的性能,尤其是处于业务高峰期的时候。减小重分配的粒度,以小批次的方式来操作是一种可行的解决思路。如果集群中某个主题或某个分区的流量在某段时间内特别大,那么只靠减小粒度是不足以应对的,这时就需要有一个限流的机制,可以对副本间的复制流量加以限制来保证重分配期间整体服务不会受太大的影响。

17.分区数的上下限

下限:性能测试工具是 Kafka 本身提供的用于生产者性能测试的 kafka-producer-perf-test.sh 和用于消费者性能测试的 kafka-consumer-perf-test.sh

records sent 表示测试时发送的消息总数; records/sec 表示以每秒发送的消息数来统计吞吐量,括号中的 MB/sec 表示以每秒发送的消息大小来统计吞吐量,注意这两者的维度; avg latency 表示消息处理的平均耗时; max latency 表示消息处理的最大耗时; 50th、95th、99th 和 99.9th 分 别表示 50%、95%、99%和 99.9%的消息处理耗时。

随着分区数的增加,相应的吞吐量也会有所增长。一旦分区数超过了某个阈值之后,整体的吞吐量也是不升反降的,说明了分区数越多并不会使吞吐量一直增长。

一味地增加分区数并不能使吞吐量- -直 得到提升,并且分区数也并不能一直增加,如果超过默认的配置值,还会引起 Kafka 进程的崩溃。

18.协议设计

在实际应用中,Kafka 经常被用作高性能、可扩展的消息中间件。Kafka 自定义了一组基于 TCP 的二进制协议,只要遵守这组协议的格式,就可以向 Kafka 发送消息,也可以从 Kafka 中拉取消息,或者做一些其他的事情,比如提交消费位移等。

在目前的 Kafka 2.0.0 中,一共包含了 43 种协议类型,每种协议类型都有对应的请求(Request) 和响应(Response) ,它们都遵守特定的协议模式。每种类型的 Request 都包含相同结构的协议请求头(RequestHeader) 和不同结构的协议请求体(RequestBody)

19.时间轮

Kafka 中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka 并没有使用 JDK 自带的 Timer 或 DelayQueue 来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer) 。JDK 中 Timer 和 DelayQueue 的插入和删除操作的平均时间复杂度为 O (nlogn) 并不能满足 Kafka 的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为 O(1) 。时间轮的应用并非 Kafka 独有,其应用场景还有很多,在 Netty、Akka、Quartz、 ZooKeeper 等组件中都存在时间轮的踪影。

Kafka 中的时间轮(TimingWheel) 是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList) 。TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs) 。时间轮的时间格个数是固定的,可用 wheelSize 来表示,那么整个时间轮的总体时间跨度(interval) 可以通过公式 tickMsxwheelSize 计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍.currentTime 可以将整个时间轮划分为到期部分和未到期部分,currentTime 当 前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的 TimerTaskL ist 中的所有任务。

20.为什么不支持读写分离?

  • 数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X,之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
  • 延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历网络 → 主节点内存 → 网络 → 从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络 → 主节点内存 → 主节点磁盘 → 网络 → 从节点内存 → 从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

21.kafka中的选举?

因为kafka中涉及多处选举机制,容易搞混,kafka由三个方面会涉及到选举:

  • broker(控制器)选leader
  • 分区多副本选leader
  • 消费者选leader

在kafka集群中由很多的broker(也叫做控制器),但是他们之间需要选举出一个leader,其他的都是follower。broker的leader有很重要的作用,诸如:创建、删除主题、增加分区并分配leader分区;集群broker管理,包括新增、关闭和故障处理;分区重分配(auto.leader.rebalance.enable=true,后面会介绍),分区leader选举。

每个broker都有唯一的brokerId,他们在启动后会去竞争注册zookeeper上的Controller结点,谁先抢到,谁就是broker leader。而其他broker会监听该结点事件,以便后续leader下线后触发重新选举。

22.消息格式

下图中左边的 “RECORD” 部分就是 v0 版本的消息格式,每个 RECORD ( v0 和 v1 版)必定对应一个 offset 和 message size。offset 用来标志它在 Partition 中的偏移量 ,这个 offset 是逻辑值,而非实际物理偏移值,message size 表示消息的大小,这两者在一起被称为日志头部 (LOG_OVERHEAD ) ,固定为12B 。LOG_OVERHEAD 和 RECORD 一起用来描述一条消息。
与消息对应的还有消息集的概念(详细结构参考下图中的右边部分),消息集中包含一条或多条消息,消息集不仅是存储于磁盘及在网络上传输(Produce & Fetch)的基本形式,而且是 Kafka 中压缩的基本单元。

  • crc32 (4B):crc32 校验值 。校验范围为 magic 至 value 之间。
  • magic (1B):消息格式版本号,此版本的 magic 值为 0。
  • attributes (1B):消息的属性。总共占 1 个字节,低 3 位表示压缩类型:0 表示 NONE、1 表示 GZIP、2 表示 SNAPPY、3 表示 LZ4 (LZ4 自 Kafka 0.9.x 引入),其余位保留。
  • key length (4B):表示消息的 key 的长度。如果为 -1,则表示没有设置 key ,即 key= null。
  • key:可选,如果没有 key 则无此字段。
  • value length (4B): 实际消息体的长度。如果为 -1,则表示消息为空。
  • value:消息体。可以为空,比如墓碑(tombstone)消息。

【檀越剑指大厂—kafka】kafka高阶篇_kafka_22

五.kafka 文件

1.kafka 文件存储机制

主题和分区是 Kafka 的两个核心概念。主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消
息的二次归类。分区的划分不仅为 Kafka 提供了可伸缩性、水平扩展的功能,还通过多副本机制来为 Kafka 提供数据冗余以提高数据可靠性。

从 Kafka 的底层实现来说,主题和分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment) ,每个日志分段还可以细分为索引文件、日志存储文件和快照文件等。不过对于使用 Kafka 进,行消息收发的普通用户而言,了解到分区这一层面足以应对大部分的使用场景。

Topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 Producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。 每个 segment 包括:“index”文件、“.log”文件和.timeindex 等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称+分区序号,例如:first-0。

  • .log 日志文件
  • .index 偏移量索引文件
  • .timeindex 时间戳索引文件
  • 其他文件

说明:index 和 log 文件以当前 segment 的第一条消息的 offset 命名。

【檀越剑指大厂—kafka】kafka高阶篇_java_23

2.日志索引

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量
(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。

Kafka 中的索引文件以稀疏索引(sparse index) 的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为 4096,即 4KB) 的消息时,偏移量索引文件和时间戳索引文,件分别增加一个偏移量索引项和时间戳索引项,增大或减小
log.index.interval.bytes 的值,对应地可以增加或缩小索引项的密度。

稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。

为了便于消息的检索,每个 LogSegment 中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为20位数字,没有达到的位数则用0填充。比如第一个 LogSegment 的基准偏移量为0,对应的日志文件为00000000000000000000.log。

3.log 文件和 index 文件

  1. 根据目标 offset 定位 Segment 文件
  2. 找到小干等于目标 offset 的最大 offset 对应的索引项
  3. 定位到 log 文件
  4. 向下遍历找到目标 Record

比如:要查找绝对 offset 为 7 的 Message

1、用二分查找确定它是在哪个 LogSegment 中,自然是在第一个 Segment 中。
2、打开这个 Segment 的 index 文件,也是用二分查找找到 offset 小于或者等于指定 offset 的索引条目中最大的那个 offset。自然 offset 为 6 的那个索引是我们要找的,通过索引文件我们知道 offset 为 6 的 Message 在数据文件中的位置为 9807。
3、打开数据文件,从位置为 9807 的那个地方开始顺序扫描直到找到 offset 为 7 的那条 Message。

【檀越剑指大厂—kafka】kafka高阶篇_幂等性_24

注意

  1. index 为稀疏索引,大约每往 log 文件写入 4kb 数据,会往 index 文件写入一条索引。参数 log.index.interval.bytes 默认 4kb。
  2. index 文件中保存的 offset 为相对 offset, 这样能确保 offset 的值所占空间不会过大,因此能将 offset 的值控制在固定大小

4.文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

  • log.retention.hours,最低优先级小时,默认了天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级亳秒。。
  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

那么日志一旦超过了设置的时间,怎么处理呢?
Kalka 中提供的日志清理策略有 delete 和 compact 两种。

5.日志删除策略

delete 日志删除:将过期数据删除

  • log.cleanup.policy =delete 所有数据启用删除策略
  1. 基于时间:默认打开。以 segment 中所有记录中的最大时间截作为该文件时间戳。
  2. 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
  • Iog.retention.bytes,默认等于-1,表示无穷大。

思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

只有最大的过期才能删除

kafka 提供了一个墓碑消息(tombstone)的概念。如果一条消息的 key 不为 null,但是其 value 为 null,那么此消息就是墓碑消息。日志清理线程发现墓碑消息时会先进行常规的清理,并保留墓碑消息一段时间。

6.日志整理策略

LogCompact日志整理

compact 日志压缩:对于相同 key 的不同 value 值,只保留最后一个版本。类似 redis 中的 aof 重写

  • log.cleanup.policy= compact 所有数据启用压缩策略

【檀越剑指大厂—kafka】kafka高阶篇_kafka_25

7.高效读写数据

  1. Kafka 本身是分布式集群,可以采用分区技术,并行度高
  2. 读数据采用稀疏索引,可以快速定位要消费的数据
  3. 顺序写磁盘,Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁密的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

8.页缓存和零拷贝

零拷贝:Kafka 的数据加工处理操作交由 Kafka 生产者和 Katka 消费者处理。Kafka Broker 应用层不关心存储的数据,所以就不用走应用层,传输效率高。
PageCache 页缓存:Katka 重度依赖底层操作系统提供的 PageCache 功能。当上层有写操作时,操作系统只是将数据写入 PageCache。 当读操作发生时,先从 PageCache 中查找,如果找不到,再去磁盘中读取。实际上 PageCache 是把尽可能多的空闲内存都当做了磁盘缓存来使用。

页缓存是操作系统实现的一种主要的磁盘缓存 ,以此用来减少对磁盘I/O操作,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。

当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页 (page)是否在页缓存(pagecache)中,如果存在(命中) 则直接返回数据,从而避免了对物理磁盘的 I/O 操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性 。

对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且除非使用Direct I/O的方式, 否则页缓存很难被禁止。此外,用过Java的人一般都知道两点事实:对象的内存开销非常大,通常会是真实数据大小的几倍甚至更多,空间使用率低下;Java 的垃圾回收会随着堆内数据的增多而变得越来越慢。基于这些因素,使用文件系统并依赖于页缓存的做法明显要优于维护一个进程内缓存或其他结构,至少我们可以省去了一份进程内部的缓存消耗,同时还可以通过结构紧凑的字节码来替代使用对象的方式以节省更多的空间。如此,我们可以在32GB的机器上使用28GB至30GB的内存而不用担心GC所带来的性能问题。此外,即使Kafka服务重启,页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。

Kafka中大量使用了页缓存,这是Kafka实现高吞吐的重要因素之一。 虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在Kafka中同样提供了同步刷盘及间断性强制刷盘( fsync )的功能,这些功能可以通过log.flush.interval.messages、log.flush.interval.ms等参数来控制。同步刷盘可以提高消息的可靠性,防止由于机器断电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。

【檀越剑指大厂—kafka】kafka高阶篇_kafka_26

9.如何查找消息?

假如现在需要查找一个offset为368801的message是什么样的过程呢?

  1. 先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。
  2. 打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。
  3. 根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。
  4. 这套机制是建立在offset为有序的基础上,利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。

【檀越剑指大厂—kafka】kafka高阶篇_java_27

六.常见问题

1.kafka 为什么快?

写入:顺序写入和 MMFile(Memory Mapped Files 内存映射文件),批量压缩,稀疏索引

MMFile 它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上

读取:基于 sendfile 实现零拷贝

  • kafka是分布式集群,采用分区方式,并行操作
  • 读取数据采用稀疏索引,可以快速定位消费数据
  • 顺序写磁盘
  • 页缓冲和零拷贝

2.说说零拷贝?

传统模式下,当需要对一个文件进行传输的时候,其具体流程细节如下:

  • 调用 read 函数,文件数据从磁盘经过DMA copy 到内核缓冲区
  • read 函数返回,文件数据从内核缓冲区cpu copy 到用户缓冲区
  • write 函数调用,将文件数据从用户缓冲区cpu copy 到内核与 socket 相关的缓冲区
  • 数据从 socket 缓冲区DMA copy 到网卡的NIC Buffer

以上细节是传统 read/write 方式进行网络文件传输的方式,我们可以看到,在这个过程当中,文件数据实际上是经过了四次 copy 操作:硬盘—>内核 buf—>用户 buf—>socket 相关缓冲区—>网卡

【檀越剑指大厂—kafka】kafka高阶篇_java_28

而 sendfile 系统调用则提供了一种减少以上多次 copy,提升文件传输性能的方法。在内核版本 2.1 中,引入了 sendfile 系统调用,以简化网络上和两个本地文件之间的数据传输。 sendfile 的引入不仅减少了数据复制,还减少了上下文切换。

  • sendfile 系统调用,文件数据被 copy 至内核缓冲区
  • 再从内核缓冲区 copy 至内核中 socket 相关的缓冲区
  • 最后再 socket 相关的缓冲区 copy 到协议引擎

【檀越剑指大厂—kafka】kafka高阶篇_数据_29

DMA(Direct Memory Access,直接内存存取) 是所有现代计算机的重要特色,它允许不同速度的硬件装置直接沟通,而不需要依于CPU的大量中断负载。在现代计算机中,运算单元不再仅仅是cpu。网卡/磁盘等都可以认为是DMA设备,是一个半自治单元,比如网卡有它自己的运算单元(相当于特异化的cpu)和自己的缓存,网卡接收和发送数据时是不需要cpu的全程参与的,磁盘也是类似的。简单来讲就是dma设备就是cpu领导下的一个不太聪明的小弟,cpu负责指挥小弟去干活,但干活的过程中是不需要cpu参与的。nio和0拷贝都是为了解放cpu。

Java中的零拷贝是依靠java.nio.channels.FileChannel中的transferTo(long position, long count, WritableByteChannel target)方法来实现的。transferTo方法的底层实现是基于操作系统的sendfile这个system call来实现的,无需将数据拷贝到用户态,sendfile负责把数据从某个fd(file descriptor)传输到另一个fd。这样就完成了零拷贝的过程。

mmap 和 sendfile总结
1、都是Linux内核提供、实现零拷贝的API;
2、sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;
3、mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
RocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。

3.为什么去 zookeeper?

这样做的好处有以下几个:

  • Ka3fka 不再依赖外部框架,而是能够独立运行;
  • controller 管理集群时, 不再需要从 zookeeper 中 先读取数据,集群性能上升;
  • 由于不依赖 zookeeper ,集群扩展时不再受到 zookeeper 读写能力限制;
  • controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强 controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策

4.什么是 kraft 架构?

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller ,由 controller 进行 Kafka 集群管理。其中 Zookeeper 集群是 Kafka 用来负责集群元数据的管理、控制器的选举等
右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper ,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理

【檀越剑指大厂—kafka】kafka高阶篇_kafka_30

5.什么是死信队列?

由于某些原因消息无法被正确地投递,为了确保消息不会被无故地丢弃,一般将其置于一个特殊角色的队列,这个队列一般称为死信队列。后续分析程序可以通过消费这个死信队列中的内容来分析当时遇到的异常情况,进而可以改善和优化系统。

6.kafka与rabbitmq的对比

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

【檀越剑指大厂—kafka】kafka高阶篇_分布式_31

【檀越剑指大厂—kafka】kafka高阶篇_分布式_32

1、Kafka可以保证顺序处理消息,RabbitMQ相对较弱。

2、在消息路由和过滤方面,RabbitMQ提供了更好的支持。

3、RabbitMQ有消息存活时间(TTL)和延迟/预定消息功能,Kafka没有。

4、在消息留存方面,RabbitMQ消息一旦消费成功就会删除,反之处理失败则放回,但Kafka会保留消息,根据超时时间来删除消息,所以Kafka可以反复消费消息。

5、在容错处理上,RabbitMQ提供了诸如交付重试和死信交换器(DLX)来处理消息处理故障,相反,Kafka没有提供这种开箱即用的机制,需要在应用层提供和实现消息的重试机制。

6、在伸缩方面,通常Kafka(使用顺序磁盘I/O来提供性能)被认为比RabbitMQ有更优越的性能,从Kafka使用分区的架构上看,它在横向扩展上会优于RabbitMQ,当然,RabbitMQ在纵向扩展上会有更多的优势,而且在吞吐量上,Kafka每秒可处理十几万消息,RabbitMQ每秒可处理几万消息,如果系统达不到百万级用户量,可以不关心伸缩性问题。

7、RabbitMQ(智能代理和傻瓜式消费者模式)比Kafka(傻瓜式代理和智能消费者模式)在消费者复杂度上更简单。

Rabbitmq比kafka可靠,kafka更适合IO高吞吐的处理,比如ELK日志收集

Kafka和RabbitMq一样是通用意图消息代理,他们都是以分布式部署为目的。但是他们对消息语义模型的定义的假设是非常不同的。

a) 以下场景比较适合使用Kafka。如果有大量的事件(10万以上/秒)、你需要以分区的,顺序的,至少传递成功一次到混杂了在线和打包消费的消费者、希望能重读消息、你能接受目前是有限的节点级别高可用就可以考虑kafka。

  • 严格的消息顺序
  • 延长消息留存时间,包括过去消息重放的可能
  • 传统解决方案无法满足的高伸缩能力

b) 以下场景比较适合使用RabbitMQ。如果是较少的事件(2万以上/秒)并且需要通过复杂的路由逻辑去找到消费者、你希望消息传递是可靠的、并不关心消息传递的顺序、而且需要现在就支持集群-节点级别的高可用就可以考虑rabbitmq。

  • 高级灵活的路由规则
  • 消息时序控制(控制消息过期或消息延迟)
  • 高级的容错处理能力,在消费者更有可能处理消息不成功的情景中(瞬时或持久)
  • 更简单的消费者实现


举报

相关推荐

0 条评论