0
点赞
收藏
分享

微信扫一扫

讲清楚 kafka 拉取消息的过程

大柚子top 2023-11-15 阅读 38

kafka 是一个高吞吐的消息服务中间件,当然这一切都是有原因的,今天我从 kafka 拉取消息这个场景剖析下 broker 的实现。

问题:kafka consumer 在 poll 的时候传递了一个 timeout 的参数,broker 是怎么处理这个参数的?如果 leader broker 有消息,肯定是立刻返回,如果没有呢,kafka 应该是等待了 timeout 参数后返回,如果在等待的过程中,topic 分区中有新的消息产生,如何触发响应给客户端呢?

1. 之前介绍过 DelayedOperateionPurgatory,处理延时操作的核心类,kafka consumer 拉取消息,被封装成一个 DelayedFetch,继承了 DelayedOperateion 类。

kafka broker 处理拉取消息的调用入口如下:

kafka.server.KafkaApis#handleFetchRequest
  kafka.server.ReplicaManager#fetchMessages

剖析 ReplicaManager#fetchMessages 的代码逻辑:

1   /**
 2    * Fetch messages from a replica, and wait until enough data can be fetched and return;
 3    * the callback function will be triggered either when timeout or required fetch info is satisfied.
 4    * Consumers may fetch from any replica, but followers can only fetch from the leader.
 5    */
 6   def fetchMessages(timeout: Long,
 7                     replicaId: Int,
 8                     fetchMinBytes: Int,
 9                     fetchMaxBytes: Int,
10                     hardMaxBytesLimit: Boolean,
11                     fetchInfos: Seq[(TopicPartition, PartitionData)],
12                     quota: ReplicaQuota,
13                     responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
14                     isolationLevel: IsolationLevel,
15                     clientMetadata: Option[ClientMetadata]): Unit = {
16     val isFromFollower = Request.isValidBrokerId(replicaId)
17     val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
18     val fetchIsolation = if (!isFromConsumer)
19       FetchLogEnd
20     else if (isolationLevel == IsolationLevel.READ_COMMITTED)
21       FetchTxnCommitted
22     else
23       FetchHighWatermark
24 
25     // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata)
26     val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
27     def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
28       val result = readFromLocalLog(
29         replicaId = replicaId,
30         fetchOnlyFromLeader = fetchOnlyFromLeader,
31         fetchIsolation = fetchIsolation,
32         fetchMaxBytes = fetchMaxBytes,
33         hardMaxBytesLimit = hardMaxBytesLimit,
34         readPartitionInfo = fetchInfos,
35         quota = quota,
36         clientMetadata = clientMetadata)
37       if (isFromFollower) updateFollowerFetchState(replicaId, result)
38       else result
39     }
40 
41     val logReadResults = readFromLog()
42 
43     // check if this fetch request can be satisfied right away
44     var bytesReadable: Long = 0
45     var errorReadingData = false
46     var hasDivergingEpoch = false
47     val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
48     logReadResults.foreach { case (topicPartition, logReadResult) =>
49       brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
50       brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
51 
52       if (logReadResult.error != Errors.NONE)
53         errorReadingData = true
54       if (logReadResult.divergingEpoch.nonEmpty)
55         hasDivergingEpoch = true
56       bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
57       logReadResultMap.put(topicPartition, logReadResult)
58     }
59 
60     // respond immediately if 1) fetch request does not want to wait
61     //                        2) fetch request does not require any data
62     //                        3) has enough data to respond
63     //                        4) some error happens while reading data
64     //                        5) we found a diverging epoch
65     if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) {
66       val fetchPartitionData = logReadResults.map { case (tp, result) =>
67         val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId)
68         tp -> result.toFetchPartitionData(isReassignmentFetch)
69       }
70       responseCallback(fetchPartitionData)
71     } else {
72       // construct the fetch results from the read results
73       val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
74       fetchInfos.foreach { case (topicPartition, partitionData) =>
75         logReadResultMap.get(topicPartition).foreach(logReadResult => {
76           val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
77           fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
78         })
79       }
80       val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
81         fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
82       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
83         responseCallback)
84 
85       // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
86       val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
87 
88       // try to complete the request immediately, otherwise put it into the purgatory;
89       // this is because while the delayed fetch operation is being created, new requests
90       // may arrive and hence make this operation completable.
91       delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
92     }
93   }

View Code

第 82 行创建了一个 DelayedFetch 对象,第 91 行监听该 DelayedFetch 对象,尝试完成,如果不能完成则监听。尝试完成会调用 DelayedFetch 的 tryComplete 方法,如果明显错误,则直接返回,如果有数据则完成,如果没有数据则返回 false(这正是我要讨论的场景),并不调用 forceComplete。

1 /**
  2  * A delayed fetch operation that can be created by the replica manager and watched
  3  * in the fetch operation purgatory
  4  */
  5 class DelayedFetch(delayMs: Long,
  6                    fetchMetadata: FetchMetadata,
  7                    replicaManager: ReplicaManager,
  8                    quota: ReplicaQuota,
  9                    clientMetadata: Option[ClientMetadata],
 10                    responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit)
 11   extends DelayedOperation(delayMs) {
 12 
 13   /**
 14    * The operation can be completed if:
 15    *
 16    * Case A: This broker is no longer the leader for some partitions it tries to fetch
 17    * Case B: The replica is no longer available on this broker
 18    * Case C: This broker does not know of some partitions it tries to fetch
 19    * Case D: The partition is in an offline log directory on this broker
 20    * Case E: This broker is the leader, but the requested epoch is now fenced
 21    * Case F: The fetch offset locates not on the last segment of the log
 22    * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
 23    * Case H: A diverging epoch was found, return response to trigger truncation
 24    * Upon completion, should return whatever data is available for each valid partition
 25    */
 26   override def tryComplete(): Boolean = {
 27     var accumulatedSize = 0
 28     fetchMetadata.fetchPartitionStatus.foreach {
 29       case (topicPartition, fetchStatus) =>
 30         val fetchOffset = fetchStatus.startOffsetMetadata
 31         val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
 32         try {
 33           if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
 34             val partition = replicaManager.getPartitionOrException(topicPartition)
 35             val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader)
 36 
 37             val endOffset = fetchMetadata.fetchIsolation match {
 38               case FetchLogEnd => offsetSnapshot.logEndOffset
 39               case FetchHighWatermark => offsetSnapshot.highWatermark
 40               case FetchTxnCommitted => offsetSnapshot.lastStableOffset
 41             }
 42 
 43             // Go directly to the check for Case G if the message offsets are the same. If the log segment
 44             // has just rolled, then the high watermark offset will remain the same but be on the old segment,
 45             // which would incorrectly be seen as an instance of Case F.
 46             if (endOffset.messageOffset != fetchOffset.messageOffset) {
 47               if (endOffset.onOlderSegment(fetchOffset)) {
 48                 // Case F, this can happen when the new fetch operation is on a truncated leader
 49                 debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.")
 50                 return forceComplete()
 51               } else if (fetchOffset.onOlderSegment(endOffset)) {
 52                 // Case F, this can happen when the fetch operation is falling behind the current segment
 53                 // or the partition has just rolled a new segment
 54                 debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.")
 55                 // We will not force complete the fetch request if a replica should be throttled.
 56                 if (!replicaManager.shouldLeaderThrottle(quota, partition, fetchMetadata.replicaId))
 57                   return forceComplete()
 58               } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
 59                 // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
 60                 val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
 61                 if (!replicaManager.shouldLeaderThrottle(quota, partition, fetchMetadata.replicaId))
 62                   accumulatedSize += bytesAvailable
 63               }
 64             }
 65 
 66             // Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation
 67             fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
 68               val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false)
 69               if (epochEndOffset.errorCode != Errors.NONE.code()
 70                   || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET
 71                   || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) {
 72                 debug(s"Could not obtain last offset for leader epoch for partition $topicPartition, epochEndOffset=$epochEndOffset.")
 73                 return forceComplete()
 74               } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
 75                 debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition " +
 76                   s"$topicPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.")
 77                 return forceComplete()
 78               }
 79             }
 80           }
 81         } catch {
 82           case _: NotLeaderOrFollowerException =>  // Case A or Case B
 83             debug(s"Broker is no longer the leader or follower of $topicPartition, satisfy $fetchMetadata immediately")
 84             return forceComplete()
 85           case _: UnknownTopicOrPartitionException => // Case C
 86             debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately")
 87             return forceComplete()
 88           case _: KafkaStorageException => // Case D
 89             debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
 90             return forceComplete()
 91           case _: FencedLeaderEpochException => // Case E
 92             debug(s"Broker is the leader of partition $topicPartition, but the requested epoch " +
 93               s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $fetchMetadata immediately")
 94             return forceComplete()
 95         }
 96     }
 97 
 98     // Case G
 99     if (accumulatedSize >= fetchMetadata.fetchMinBytes)
100        forceComplete()
101     else
102       false
103   }
104 
105   override def onExpiration(): Unit = {
106     if (fetchMetadata.isFromFollower)
107       DelayedFetchMetrics.followerExpiredRequestMeter.mark()
108     else
109       DelayedFetchMetrics.consumerExpiredRequestMeter.mark()
110   }
111 
112   /**
113    * Upon completion, read whatever data is available and pass to the complete callback
114    */
115   override def onComplete(): Unit = {
116     val logReadResults = replicaManager.readFromLocalLog(
117       replicaId = fetchMetadata.replicaId,
118       fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader,
119       fetchIsolation = fetchMetadata.fetchIsolation,
120       fetchMaxBytes = fetchMetadata.fetchMaxBytes,
121       hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
122       readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
123       clientMetadata = clientMetadata,
124       quota = quota)
125 
126     val fetchPartitionData = logReadResults.map { case (tp, result) =>
127       val isReassignmentFetch = fetchMetadata.isFromFollower &&
128         replicaManager.isAddingReplica(tp, fetchMetadata.replicaId)
129 
130       tp -> result.toFetchPartitionData(isReassignmentFetch)
131     }
132 
133     responseCallback(fetchPartitionData)
134   }
135 }

View Code

 

2. 分区 follower 一直从 leader 拉取消息,并提交自己的同步位置, 当分区 leader 发现该分区对应的 hw 提高时,表示产生了新的消息可以对消费者可见了,此时应该触发阻塞的 DelayedFetch 了。

1 object Partition extends KafkaMetricsGroup {
 2   def apply(topicPartition: TopicPartition,
 3             time: Time,
 4             configRepository: ConfigRepository,
 5             replicaManager: ReplicaManager): Partition = {
 6 
 7     val isrChangeListener = new IsrChangeListener {
 8       override def markExpand(): Unit = {
 9         replicaManager.isrExpandRate.mark()
10       }
11 
12       override def markShrink(): Unit = {
13         replicaManager.isrShrinkRate.mark()
14       }
15 
16       override def markFailed(): Unit = replicaManager.failedIsrUpdatesRate.mark()
17     }
18 
19     val delayedOperations = new DelayedOperations(
20       topicPartition,
21       replicaManager.delayedProducePurgatory,
22       replicaManager.delayedFetchPurgatory,
23       replicaManager.delayedDeleteRecordsPurgatory)
24 
25     new Partition(topicPartition,
26       replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
27       interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion,
28       localBrokerId = replicaManager.config.brokerId,
29       time = time,
30       isrChangeListener = isrChangeListener,
31       delayedOperations = delayedOperations,
32       metadataCache = replicaManager.metadataCache,
33       logManager = replicaManager.logManager,
34       alterIsrManager = replicaManager.alterIsrManager)
35   }
36 
37   def removeMetrics(topicPartition: TopicPartition): Unit = {
38     val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
39     removeMetric("UnderReplicated", tags)
40     removeMetric("UnderMinIsr", tags)
41     removeMetric("InSyncReplicasCount", tags)
42     removeMetric("ReplicasCount", tags)
43     removeMetric("LastStableOffsetLag", tags)
44     removeMetric("AtMinIsr", tags)
45   }
46 }

View Code

第 19 行,Partition 中 delayedOperations 变量包含 了 delayedFetchPurgatory,follower 一直从 leader 拉取消息,并更新自己的 leo 给 leader,leader 判断 hw 增长了,则表示新的消息对消费者可见了。

// kafka.cluster.Partition#updateFollowerFetchState

1   /**
 2    * Update the follower's state in the leader based on the last fetch request. See
 3    * [[Replica.updateFetchState()]] for details.
 4    *
 5    * @return true if the follower's fetch state was updated, false if the followerId is not recognized
 6    */
 7   def updateFollowerFetchState(followerId: Int,
 8                                followerFetchOffsetMetadata: LogOffsetMetadata,
 9                                followerStartOffset: Long,
10                                followerFetchTimeMs: Long,
11                                leaderEndOffset: Long): Boolean = {
12     getReplica(followerId) match {
13       case Some(followerReplica) =>
14         // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
15         val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
16         val prevFollowerEndOffset = followerReplica.logEndOffset
17         followerReplica.updateFetchState(
18           followerFetchOffsetMetadata,
19           followerStartOffset,
20           followerFetchTimeMs,
21           leaderEndOffset)
22 
23         val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
24         // check if the LW of the partition has incremented
25         // since the replica's logStartOffset may have incremented
26         val leaderLWIncremented = newLeaderLW > oldLeaderLW
27 
28         // Check if this in-sync replica needs to be added to the ISR.
29         maybeExpandIsr(followerReplica, followerFetchTimeMs)
30 
31         // check if the HW of the partition can now be incremented
32         // since the replica may already be in the ISR and its LEO has just incremented
33         val leaderHWIncremented = if (prevFollowerEndOffset != followerReplica.logEndOffset) {
34           // the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of
35           // leaderIsrUpdateLock to prevent adding new hw to invalid log.
36           inReadLock(leaderIsrUpdateLock) {
37             leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
38           }
39         } else {
40           false
41         }
42 
43         // some delayed operations may be unblocked after HW or LW changed
44         if (leaderLWIncremented || leaderHWIncremented)
45           tryCompleteDelayedRequests()
46 
47         debug(s"Recorded replica $followerId log end offset (LEO) position " +
48           s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.")
49         true
50 
51       case None =>
52         false
53     }
54   }

View Code

第 44 行,分区 leader 判断分区的 hw 增长了,需要完成部分 DelayedFetch。最终会触发 DelayedFetch 的 onComplete 方法,读取消息返回给客户端。 

// kafka.cluster.Partition#tryCompleteDelayedRequests
/**
 * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
 */
private def tryCompleteDelayedRequests(): Unit = delayedOperations.checkAndCompleteAll()

 

举报

相关推荐

0 条评论