kafka 是一个高吞吐的消息服务中间件,当然这一切都是有原因的,今天我从 kafka 拉取消息这个场景剖析下 broker 的实现。
问题:kafka consumer 在 poll 的时候传递了一个 timeout 的参数,broker 是怎么处理这个参数的?如果 leader broker 有消息,肯定是立刻返回,如果没有呢,kafka 应该是等待了 timeout 参数后返回,如果在等待的过程中,topic 分区中有新的消息产生,如何触发响应给客户端呢?
1. 之前介绍过 DelayedOperateionPurgatory,处理延时操作的核心类,kafka consumer 拉取消息,被封装成一个 DelayedFetch,继承了 DelayedOperateion 类。
kafka broker 处理拉取消息的调用入口如下:
kafka.server.KafkaApis#handleFetchRequest
kafka.server.ReplicaManager#fetchMessages
剖析 ReplicaManager#fetchMessages 的代码逻辑:
1 /**
2 * Fetch messages from a replica, and wait until enough data can be fetched and return;
3 * the callback function will be triggered either when timeout or required fetch info is satisfied.
4 * Consumers may fetch from any replica, but followers can only fetch from the leader.
5 */
6 def fetchMessages(timeout: Long,
7 replicaId: Int,
8 fetchMinBytes: Int,
9 fetchMaxBytes: Int,
10 hardMaxBytesLimit: Boolean,
11 fetchInfos: Seq[(TopicPartition, PartitionData)],
12 quota: ReplicaQuota,
13 responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
14 isolationLevel: IsolationLevel,
15 clientMetadata: Option[ClientMetadata]): Unit = {
16 val isFromFollower = Request.isValidBrokerId(replicaId)
17 val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
18 val fetchIsolation = if (!isFromConsumer)
19 FetchLogEnd
20 else if (isolationLevel == IsolationLevel.READ_COMMITTED)
21 FetchTxnCommitted
22 else
23 FetchHighWatermark
24
25 // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata)
26 val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
27 def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
28 val result = readFromLocalLog(
29 replicaId = replicaId,
30 fetchOnlyFromLeader = fetchOnlyFromLeader,
31 fetchIsolation = fetchIsolation,
32 fetchMaxBytes = fetchMaxBytes,
33 hardMaxBytesLimit = hardMaxBytesLimit,
34 readPartitionInfo = fetchInfos,
35 quota = quota,
36 clientMetadata = clientMetadata)
37 if (isFromFollower) updateFollowerFetchState(replicaId, result)
38 else result
39 }
40
41 val logReadResults = readFromLog()
42
43 // check if this fetch request can be satisfied right away
44 var bytesReadable: Long = 0
45 var errorReadingData = false
46 var hasDivergingEpoch = false
47 val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
48 logReadResults.foreach { case (topicPartition, logReadResult) =>
49 brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
50 brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
51
52 if (logReadResult.error != Errors.NONE)
53 errorReadingData = true
54 if (logReadResult.divergingEpoch.nonEmpty)
55 hasDivergingEpoch = true
56 bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
57 logReadResultMap.put(topicPartition, logReadResult)
58 }
59
60 // respond immediately if 1) fetch request does not want to wait
61 // 2) fetch request does not require any data
62 // 3) has enough data to respond
63 // 4) some error happens while reading data
64 // 5) we found a diverging epoch
65 if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) {
66 val fetchPartitionData = logReadResults.map { case (tp, result) =>
67 val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId)
68 tp -> result.toFetchPartitionData(isReassignmentFetch)
69 }
70 responseCallback(fetchPartitionData)
71 } else {
72 // construct the fetch results from the read results
73 val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
74 fetchInfos.foreach { case (topicPartition, partitionData) =>
75 logReadResultMap.get(topicPartition).foreach(logReadResult => {
76 val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
77 fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
78 })
79 }
80 val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
81 fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
82 val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
83 responseCallback)
84
85 // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
86 val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
87
88 // try to complete the request immediately, otherwise put it into the purgatory;
89 // this is because while the delayed fetch operation is being created, new requests
90 // may arrive and hence make this operation completable.
91 delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
92 }
93 }
View Code
第 82 行创建了一个 DelayedFetch 对象,第 91 行监听该 DelayedFetch 对象,尝试完成,如果不能完成则监听。尝试完成会调用 DelayedFetch 的 tryComplete 方法,如果明显错误,则直接返回,如果有数据则完成,如果没有数据则返回 false(这正是我要讨论的场景),并不调用 forceComplete。
1 /**
2 * A delayed fetch operation that can be created by the replica manager and watched
3 * in the fetch operation purgatory
4 */
5 class DelayedFetch(delayMs: Long,
6 fetchMetadata: FetchMetadata,
7 replicaManager: ReplicaManager,
8 quota: ReplicaQuota,
9 clientMetadata: Option[ClientMetadata],
10 responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit)
11 extends DelayedOperation(delayMs) {
12
13 /**
14 * The operation can be completed if:
15 *
16 * Case A: This broker is no longer the leader for some partitions it tries to fetch
17 * Case B: The replica is no longer available on this broker
18 * Case C: This broker does not know of some partitions it tries to fetch
19 * Case D: The partition is in an offline log directory on this broker
20 * Case E: This broker is the leader, but the requested epoch is now fenced
21 * Case F: The fetch offset locates not on the last segment of the log
22 * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
23 * Case H: A diverging epoch was found, return response to trigger truncation
24 * Upon completion, should return whatever data is available for each valid partition
25 */
26 override def tryComplete(): Boolean = {
27 var accumulatedSize = 0
28 fetchMetadata.fetchPartitionStatus.foreach {
29 case (topicPartition, fetchStatus) =>
30 val fetchOffset = fetchStatus.startOffsetMetadata
31 val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
32 try {
33 if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
34 val partition = replicaManager.getPartitionOrException(topicPartition)
35 val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader)
36
37 val endOffset = fetchMetadata.fetchIsolation match {
38 case FetchLogEnd => offsetSnapshot.logEndOffset
39 case FetchHighWatermark => offsetSnapshot.highWatermark
40 case FetchTxnCommitted => offsetSnapshot.lastStableOffset
41 }
42
43 // Go directly to the check for Case G if the message offsets are the same. If the log segment
44 // has just rolled, then the high watermark offset will remain the same but be on the old segment,
45 // which would incorrectly be seen as an instance of Case F.
46 if (endOffset.messageOffset != fetchOffset.messageOffset) {
47 if (endOffset.onOlderSegment(fetchOffset)) {
48 // Case F, this can happen when the new fetch operation is on a truncated leader
49 debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.")
50 return forceComplete()
51 } else if (fetchOffset.onOlderSegment(endOffset)) {
52 // Case F, this can happen when the fetch operation is falling behind the current segment
53 // or the partition has just rolled a new segment
54 debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.")
55 // We will not force complete the fetch request if a replica should be throttled.
56 if (!replicaManager.shouldLeaderThrottle(quota, partition, fetchMetadata.replicaId))
57 return forceComplete()
58 } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
59 // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
60 val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
61 if (!replicaManager.shouldLeaderThrottle(quota, partition, fetchMetadata.replicaId))
62 accumulatedSize += bytesAvailable
63 }
64 }
65
66 // Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation
67 fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
68 val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false)
69 if (epochEndOffset.errorCode != Errors.NONE.code()
70 || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET
71 || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) {
72 debug(s"Could not obtain last offset for leader epoch for partition $topicPartition, epochEndOffset=$epochEndOffset.")
73 return forceComplete()
74 } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
75 debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition " +
76 s"$topicPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.")
77 return forceComplete()
78 }
79 }
80 }
81 } catch {
82 case _: NotLeaderOrFollowerException => // Case A or Case B
83 debug(s"Broker is no longer the leader or follower of $topicPartition, satisfy $fetchMetadata immediately")
84 return forceComplete()
85 case _: UnknownTopicOrPartitionException => // Case C
86 debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately")
87 return forceComplete()
88 case _: KafkaStorageException => // Case D
89 debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
90 return forceComplete()
91 case _: FencedLeaderEpochException => // Case E
92 debug(s"Broker is the leader of partition $topicPartition, but the requested epoch " +
93 s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $fetchMetadata immediately")
94 return forceComplete()
95 }
96 }
97
98 // Case G
99 if (accumulatedSize >= fetchMetadata.fetchMinBytes)
100 forceComplete()
101 else
102 false
103 }
104
105 override def onExpiration(): Unit = {
106 if (fetchMetadata.isFromFollower)
107 DelayedFetchMetrics.followerExpiredRequestMeter.mark()
108 else
109 DelayedFetchMetrics.consumerExpiredRequestMeter.mark()
110 }
111
112 /**
113 * Upon completion, read whatever data is available and pass to the complete callback
114 */
115 override def onComplete(): Unit = {
116 val logReadResults = replicaManager.readFromLocalLog(
117 replicaId = fetchMetadata.replicaId,
118 fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader,
119 fetchIsolation = fetchMetadata.fetchIsolation,
120 fetchMaxBytes = fetchMetadata.fetchMaxBytes,
121 hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
122 readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
123 clientMetadata = clientMetadata,
124 quota = quota)
125
126 val fetchPartitionData = logReadResults.map { case (tp, result) =>
127 val isReassignmentFetch = fetchMetadata.isFromFollower &&
128 replicaManager.isAddingReplica(tp, fetchMetadata.replicaId)
129
130 tp -> result.toFetchPartitionData(isReassignmentFetch)
131 }
132
133 responseCallback(fetchPartitionData)
134 }
135 }
View Code
2. 分区 follower 一直从 leader 拉取消息,并提交自己的同步位置, 当分区 leader 发现该分区对应的 hw 提高时,表示产生了新的消息可以对消费者可见了,此时应该触发阻塞的 DelayedFetch 了。
1 object Partition extends KafkaMetricsGroup {
2 def apply(topicPartition: TopicPartition,
3 time: Time,
4 configRepository: ConfigRepository,
5 replicaManager: ReplicaManager): Partition = {
6
7 val isrChangeListener = new IsrChangeListener {
8 override def markExpand(): Unit = {
9 replicaManager.isrExpandRate.mark()
10 }
11
12 override def markShrink(): Unit = {
13 replicaManager.isrShrinkRate.mark()
14 }
15
16 override def markFailed(): Unit = replicaManager.failedIsrUpdatesRate.mark()
17 }
18
19 val delayedOperations = new DelayedOperations(
20 topicPartition,
21 replicaManager.delayedProducePurgatory,
22 replicaManager.delayedFetchPurgatory,
23 replicaManager.delayedDeleteRecordsPurgatory)
24
25 new Partition(topicPartition,
26 replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
27 interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion,
28 localBrokerId = replicaManager.config.brokerId,
29 time = time,
30 isrChangeListener = isrChangeListener,
31 delayedOperations = delayedOperations,
32 metadataCache = replicaManager.metadataCache,
33 logManager = replicaManager.logManager,
34 alterIsrManager = replicaManager.alterIsrManager)
35 }
36
37 def removeMetrics(topicPartition: TopicPartition): Unit = {
38 val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
39 removeMetric("UnderReplicated", tags)
40 removeMetric("UnderMinIsr", tags)
41 removeMetric("InSyncReplicasCount", tags)
42 removeMetric("ReplicasCount", tags)
43 removeMetric("LastStableOffsetLag", tags)
44 removeMetric("AtMinIsr", tags)
45 }
46 }
View Code
第 19 行,Partition 中 delayedOperations 变量包含 了 delayedFetchPurgatory,follower 一直从 leader 拉取消息,并更新自己的 leo 给 leader,leader 判断 hw 增长了,则表示新的消息对消费者可见了。
// kafka.cluster.Partition#updateFollowerFetchState
1 /**
2 * Update the follower's state in the leader based on the last fetch request. See
3 * [[Replica.updateFetchState()]] for details.
4 *
5 * @return true if the follower's fetch state was updated, false if the followerId is not recognized
6 */
7 def updateFollowerFetchState(followerId: Int,
8 followerFetchOffsetMetadata: LogOffsetMetadata,
9 followerStartOffset: Long,
10 followerFetchTimeMs: Long,
11 leaderEndOffset: Long): Boolean = {
12 getReplica(followerId) match {
13 case Some(followerReplica) =>
14 // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
15 val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
16 val prevFollowerEndOffset = followerReplica.logEndOffset
17 followerReplica.updateFetchState(
18 followerFetchOffsetMetadata,
19 followerStartOffset,
20 followerFetchTimeMs,
21 leaderEndOffset)
22
23 val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
24 // check if the LW of the partition has incremented
25 // since the replica's logStartOffset may have incremented
26 val leaderLWIncremented = newLeaderLW > oldLeaderLW
27
28 // Check if this in-sync replica needs to be added to the ISR.
29 maybeExpandIsr(followerReplica, followerFetchTimeMs)
30
31 // check if the HW of the partition can now be incremented
32 // since the replica may already be in the ISR and its LEO has just incremented
33 val leaderHWIncremented = if (prevFollowerEndOffset != followerReplica.logEndOffset) {
34 // the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of
35 // leaderIsrUpdateLock to prevent adding new hw to invalid log.
36 inReadLock(leaderIsrUpdateLock) {
37 leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
38 }
39 } else {
40 false
41 }
42
43 // some delayed operations may be unblocked after HW or LW changed
44 if (leaderLWIncremented || leaderHWIncremented)
45 tryCompleteDelayedRequests()
46
47 debug(s"Recorded replica $followerId log end offset (LEO) position " +
48 s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.")
49 true
50
51 case None =>
52 false
53 }
54 }
View Code
第 44 行,分区 leader 判断分区的 hw 增长了,需要完成部分 DelayedFetch。最终会触发 DelayedFetch 的 onComplete 方法,读取消息返回给客户端。
// kafka.cluster.Partition#tryCompleteDelayedRequests
/**
* Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
*/
private def tryCompleteDelayedRequests(): Unit = delayedOperations.checkAndCompleteAll()