0
点赞
收藏
分享

微信扫一扫

Kafka 源码解析之 Producer 单 Partition 顺序性实现及配置说明

陆佃 2021-10-09 阅读 89
kafka

今天把 Kafka Producer 最后一部分给讲述一下,Producer 大部分内容都已经在前面几篇文章介绍过了,这里简单做个收尾,但并不是对前面的总结,本文从两块来讲述:RecordAccumulator 类的实现、Kafka Producer 如何保证其顺序性以及 Kafka Producer 的配置说明,每个 Producer 线程都会有一个 RecordAccumulator 对象,它负责缓存要发送 RecordBatch、记录发送的状态并且进行相应的处理,这里会详细讲述 Kafka Producer 如何保证单 Partition 的有序性。最后,简单介绍一下 Producer 的参数配置说明,只有正确地理解 Producer 相关的配置参数,才能更好地使用 Producer,发挥其相应的作用。

RecordAccumulator

这里再看一下 RecordAccumulator 的数据结构,如下图所示,每个 topic-partition 都有一个对应的 deque,deque 中存储的是 RecordBatch,它是发送的基本单位,只有这个 topic-partition 的 RecordBatch 达到大小或时间要求才会触发发送操作(但并不是只有达到这两个条件之一才会被发送,这点要理解清楚)。

再看一下 RecordAccumulator 类的主要方法介绍,如下图所示。

这张图基本上涵盖了 RecordAccumulator 的主要方法,下面会选择其中几个方法详细讲述,会围绕着 Kafka Producer 如何实现单 Partition 顺序性这个主题来讲述。

mutePartition() 与 unmutePartition()

先看下 mutePartition() 与 unmutePartition() 这两个方法,它们是保证有序性关键之一,其主要做用就是将指定的 topic-partition 从 muted 集合中加入或删除,后面会看到它们的作用。

private final Set<TopicPartition> muted;

public void mutePartition(TopicPartition tp) {
    muted.add(tp);
}

public void unmutePartition(TopicPartition tp) {
    muted.remove(tp);
}

这里先说一下这两个方法调用的条件,这样的话,下面在介绍其他方法时才会更容易理解:

  • mutePartition():如果要求保证顺序性,那么这个 tp 对应的 RecordBatch 如果要开始发送,就将这个 tp 加入到 muted 集合中;
  • unmutePartition():如果 tp 对应的 RecordBatch 发送完成,tp 将会从 muted 集合中移除。

也就是说,muted 是用来记录这个 tp 是否有还有未完成的 RecordBatch。

ready()
ready() 是在 Sender 线程中调用的,其作用选择那些可以发送的 node,也就是说,如果这个 tp 对应的 batch 可以发送(达到时间或大小要求),就把 tp 对应的 leader 选出来。

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    Set<String> unknownLeaderTopics = new HashSet<>();

    boolean exhausted = this.free.queued() > 0;
    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
        TopicPartition part = entry.getKey();
        Deque<RecordBatch> deque = entry.getValue();

        Node leader = cluster.leaderFor(part);
        synchronized (deque) {
            if (leader == null && !deque.isEmpty()) {
                // This is a partition for which leader is not known, but messages are available to send.
                // Note that entries are currently not removed from batches when deque is empty.
                unknownLeaderTopics.add(part.topic());
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {//note: part 如果 mute 就不会遍历
                RecordBatch batch = deque.peekFirst();
                if (batch != null) {
                    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                    //note: 是否是在重试
                    long waitedTimeMs = nowMs - batch.lastAttemptMs;
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                    boolean full = deque.size() > 1 || batch.isFull(); //note: batch 满了
                    boolean expired = waitedTimeMs >= timeToWaitMs; //note: batch 超时
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();
                    if (sendable && !backingOff) {
                        readyNodes.add(leader);// note: 将可以发送的 leader 添加到集合中
                    } else {
                        // Note that this results in a conservative estimate since an un-sendable partition may have
                        // a leader that will later be found to have sendable data. However, this is good enough
                        // since we'll just wake up and then sleep again for the remaining time.
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }

    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

可以看到这一行 (!readyNodes.contains(leader) && !muted.contains(part)),如果 muted 集合包含这个 tp,那么在遍历时将不会处理它对应的 deque,也就是说,如果一个 tp 加入了 muted 集合中,即使它对应的 RecordBatch 可以发送了,也不会触发引起其对应的 leader 被选择出来。

drain()
drain() 是用来遍历可发送请求的 node,然后再遍历在这个 node 上所有 tp,如果 tp 对应的 deque 有数据,将会被选择出来直到超过一个请求的最大长度(max.request.size)为止,也就说说即使 RecordBatch 没有达到条件,但为了保证每个 request 尽快多地发送数据提高发送效率,这个 RecordBatch 依然会被提前选出来并进行发送(这里不会吧??)。

//note: 返回该 node 对应的可以发送的 RecordBatch 的 batches,并从 queue 中移除(最大的大小为maxSize,超过的话,下次再发送)
public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
                                             Set<Node> nodes,
                                             int maxSize,
                                             long now) {
    if (nodes.isEmpty())
        return Collections.emptyMap();

    Map<Integer, List<RecordBatch>> batches = new HashMap<>();
    for (Node node : nodes) {
        int size = 0;
        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
        List<RecordBatch> ready = new ArrayList<>();
        /* to make starvation less likely this loop doesn't start at 0 */
        int start = drainIndex = drainIndex % parts.size();
        do {
            PartitionInfo part = parts.get(drainIndex);
            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
            // Only proceed if the partition has no in-flight batches.
            if (!muted.contains(tp)) {//note: 被 mute 的 tp 依然不会被遍历
                Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                if (deque != null) {
                    synchronized (deque) {
                        RecordBatch first = deque.peekFirst();
                        if (first != null) {
                            boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                            // Only drain the batch if it is not during backoff period.
                            if (!backoff) {
                                if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                    // there is a rare case that a single batch size is larger than the request size due
                                    // to compression; in this case we will still eventually send this batch in a single
                                    // request
                                    break;
                                } else {
                                    RecordBatch batch = deque.pollFirst();
                                    batch.close();
                                    size += batch.sizeInBytes();
                                    ready.add(batch);
                                    batch.drainedMs = now;
                                }
                            }
                        }
                    }
                }
            }
            this.drainIndex = (this.drainIndex + 1) % parts.size();
        } while (start != drainIndex);
        batches.put(node.id(), ready);
    }
    return batches;
}

在遍历 node 的所有 tp 时,可以看到是有条件的 —— !muted.contains(tp),如果这个 tp 被添加到 muted 集合中,那么它将不会被遍历,也就不会作为 request 一部分被发送出去,这也就保证了 tp 如果还有未完成的 RecordBatch,那么其对应 deque 中其他 RecordBatch 即使达到条件也不会被发送,就保证了 tp 在任何时刻只有一个 RecordBatch 在发送。

顺序性如何保证?
是否保证顺序性,还是在 Sender 线程中实现的,mutePartition() 与 unmutePartition() 也都是在 Sender 中调用的,这里看一下 KafkaProducer 是如何初始化一个 Sender 对象的。

// from KafkaProducer
this.sender = new Sender(client,
                         this.metadata,
                         his.accumulator,
                         config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                         config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                         (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                         config.getInt(ProducerConfig.RETRIES_CONFIG),
                         this.metrics,
                         Time.SYSTEM,
                         this.requestTimeoutMs);//NOTE: Sender 实例,发送请求的后台线程

// from Sender
public Sender(KafkaClient client,
              Metadata metadata,
              RecordAccumulator accumulator,
              boolean guaranteeMessageOrder,
              int maxRequestSize,
              short acks,
              int retries,
              Metrics metrics,
              Time time,
              int requestTimeout) {
        this.client = client;
        this.accumulator = accumulator;
        this.metadata = metadata;
        this.guaranteeMessageOrder = guaranteeMessageOrder;
        this.maxRequestSize = maxRequestSize;
        this.running = true; //note: 默认为 true
        this.acks = acks;
        this.retries = retries;
        this.time = time;
        this.sensors = new SenderMetrics(metrics);
        this.requestTimeout = requestTimeout;
}

对于上述过程可以这样进行解读

this.guaranteeMessageOrder = (config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1)

如果 KafkaProducer 的 max.in.flight.requests.per.connection 设置为1,那么就可以保证其顺序性,否则的话,就不保证顺序性,从下面这段代码也可以看出。

//from Sender
//note: max.in.flight.requests.per.connection 设置为1时会保证
if (guaranteeMessageOrder) {
    // Mute all the partitions draine
    for (List<RecordBatch> batchList : batches.values()) {
         for (RecordBatch batch : batchList)
             this.accumulator.mutePartition(batch.topicPartition);
    }
}

也就是说,如果要保证单 Partition 的顺序性,需要在 Producer 中配置 max.in.flight.requests.per.connection=1,而其实现机制则是在 RecordAccumulator 中实现的。保证recordBatch按照deque先进先出的顺序发送。

举报

相关推荐

0 条评论