0
点赞
收藏
分享

微信扫一扫

Spring kafka error handling

梦为马 2022-06-27 阅读 83

In this article, i will discuss in details how spring kafka handles consumer errors. This is way more complex than I thought, so i think it is worth writing a blog to share some details about this topic.

Unlike standard kafka java client, where consumer logic will be put in an infite loop by periodically calling KafkaConsumer.poll(timeoutMs) method , spring kafka recommends putting consumer logic in a normal java class method, and the method is annotated with "KafkaListener". Eg. 

class PaymentRequestHandler {
@KafkaListener(topics = ["\${payment.kafka.internal.paymentTopicName}"], containerFactory = "defaultKafkaListenerContainerFactory")
fun consume(consumerRecord: ConsumerRecord<String, ByteArray>) {
//consumer logic here
}
}

Internally, spring kafka still uses a poll loop in KafkaMessageListenerContainer.ListenerConsumer 's run method

public void run() {
while (isRunning()) {
....
try {
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// Ignore, we're stopping or applying immediate foreign acks
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");
exitThrowable = nofpe;
break;
}
.... handle some other specific exception types
catch (Exception e) {
handleConsumerException(e);
}
}
wrapUp(exitThrowable);
}

We will then discuss various error scenarios:

  1. Kafka listener method throws an exception and records are not processed in batch

Whatever exception thrown in kafka listener method will be wrapped as a org.springframework.kafka.listener.ListenerExecutionFailedException in org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.onMessage method. This exception class is a subclass of RuntimeException, it will be caught by KafkaMessageListenerContainer.ListenerConsumer 's doInvokeRecordListener method. 

private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record, // NOSONAR
Iterator<ConsumerRecord<K, V>> iterator) {

Object sample = startMicrometerSample();
try {
invokeOnMessage(record);
successTimer(sample);
}
catch (RuntimeException e) {
failureTimer(sample);
boolean acked = this.containerProperties.isAckOnError() && !this.autoCommit && this.producer == null;
if (acked) {
ackCurrent(record);
}
if (this.errorHandler == null) {
throw e;
}
try {
invokeErrorHandler(record, iterator, e);
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
... handle other exceptions
}
}

invokeErrorHandler method will delegate to ErrorHandler injected into KafkaMessageListenerContainer. By default, the error handler class is org.springframework.kafka.listener.SeekToCurrentErrorHandler

ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
if (this.isBatchListener) {
validateErrorHandler(true);
this.errorHandler = new LoggingErrorHandler();
this.batchErrorHandler = determineBatchErrorHandler(errHandler);
}
else {
validateErrorHandler(false);
this.errorHandler = determineErrorHandler(errHandler);
this.batchErrorHandler = new BatchLoggingErrorHandler();
}
}

The behavior of SeekToCurrentErrorHandler is to reset consumer offset to that of the failed record. org.springframework.kafka.listener.SeekUtils.doSeeks method will call org.springframework.kafka.listener.FailedRecordTracker.skip method to test whether the partition-offset should be skipped. If not, the current consumer record’s partition/offset will be added to a map of SeekPartitions and kafkaConsumer.seek(partition, offset) will be called from  org.springframework.kafka.listener.SeekUtils.seekPartitions method.

By default, each failed record will be retried for at most 10 times, why is this so? The key logic is within FailedRecordTracker.skip method. FailedRecordTracker will internally track retry times, and compare it with the max retry parameter in backoff object in SeekToCurrentErrorHandler’s constructor

public SeekToCurrentErrorHandler() {
this(null, SeekUtils.DEFAULT_BACK_OFF);
}
public static final int DEFAULT_MAX_FAILURES = 10;
/**
* The default back off - a {@link FixedBackOff} with 0 interval and
* {@link #DEFAULT_MAX_FAILURES} - 1 retry attempts.
*/
public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0, DEFAULT_MAX_FAILURES - 1);

Whenever max retry time is reached, it will return BackOffExecution.STOP for next backoff

boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
if (nextBackOff != BackOffExecution.STOP) {
try {
Thread.sleep(nextBackOff);
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
else {
//attempt recovery
return true
}
}

To change the default behavior, we can inject a customized errorHandler to KafkaListenerContainerFactory, where we can change the backoff parameter. 

@Bean
open fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<String, ByteArray>): ConcurrentKafkaListenerContainerFactory<*, *>? {
val factory = ConcurrentKafkaListenerContainerFactory<String, ByteArray>()
factory.consumerFactory = consumerFactory
//retry 3 times only
val backOff = FixedBackOff(0, 3)
val errorHandler = SeekToCurrentErrorHandler(backOff)
factory.setErrorHandler(errorHandler)
return factory
}

  1. Kafka listener method executes for a long time

In this case, we will also observe that retry is being performed continuously as long as timeout occurs

So first question is what causes the retry? The log gives pretty clear clue:

org.apache.kafka.clients.consumer.CommitFailedException: 
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
You can address this either by increasing max.poll.interval.ms
or by reducing the maximum size of batches returned in poll() with max.poll.records.

And stackTrace

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504) ~[kafka-clients-2.6.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2396) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2391) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2377) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2191) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1149) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]

This is a way how kafka deals with live lock situnation, when a consumer's poll method executes for long time (longer than max.poll.interval), the consumer coordinator will consider the consumer as dead. To ensure consumption progress, it will kick the consumer out of the consumer group. So when the consumer tries to commit offset asynchronously, it is no longer part of the consumer group and its partitions have been reassigned to other consumer, consequently, the commit attempt will fail and this exception is fatal and non-retriable. 

Going to code level: The KafkaMessageListenerContainer.ListenerConsumer 's pollAndInvoke method tries to call processCommit where CommitFailedException is thrown. The offset commit/submission is not successful, and since the whole consumer group is rebalanced, the next poll attempt will retrieve initial commit offset again from the broker, which is still the same. So that is why the old ConsumerRecord will be consumed again. Note that the root cause of retry for the first and second case are different. For the first case, retry is triggered by seeking back consumer offset, whereas for the second case, retry is triggered by updating consumer offset sent by broker. 

seekValidated:895, SubscriptionState$TopicPartitionState (org.apache.kafka.clients.consumer.internals)
seekUnvalidated:903, SubscriptionState$TopicPartitionState (org.apache.kafka.clients.consumer.internals)
access$100:729, SubscriptionState$TopicPartitionState (org.apache.kafka.clients.consumer.internals)
maybeSeekUnvalidated:398, SubscriptionState (org.apache.kafka.clients.consumer.internals)
resetOffsetIfNeeded:727, Fetcher (org.apache.kafka.clients.consumer.internals)
onSuccess:751, Fetcher$2 (org.apache.kafka.clients.consumer.internals)
onSuccess:739, Fetcher$2 (org.apache.kafka.clients.consumer.internals)
fireSuccess:169, RequestFuture (org.apache.kafka.clients.consumer.internals)
complete:129, RequestFuture (org.apache.kafka.clients.consumer.internals)
handleListOffsetResponse:1074, Fetcher (org.apache.kafka.clients.consumer.internals)
access$2800:130, Fetcher (org.apache.kafka.clients.consumer.internals)
onSuccess:981, Fetcher$5 (org.apache.kafka.clients.consumer.internals)
onSuccess:976, Fetcher$5 (org.apache.kafka.clients.consumer.internals)
onSuccess:206, RequestFuture$1 (org.apache.kafka.clients.consumer.internals)
fireSuccess:169, RequestFuture (org.apache.kafka.clients.consumer.internals)
complete:129, RequestFuture (org.apache.kafka.clients.consumer.internals)
fireCompletion:602, ConsumerNetworkClient$RequestFutureCompletionHandler (org.apache.kafka.clients.consumer.internals)
firePendingCompletedRequests:412, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
poll:297, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
poll:236, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
poll:227, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
position:1760, KafkaConsumer (org.apache.kafka.clients.consumer)
position:1718, KafkaConsumer (org.apache.kafka.clients.consumer)
seekPartitions:1033, KafkaMessageListenerContainer$ListenerConsumer (org.springframework.kafka.listener)
access$3800:462, KafkaMessageListenerContainer$ListenerConsumer (org.springframework.kafka.listener)
onPartitionsAssigned:2643, KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener (org.springframework.kafka.listener)
invokePartitionsAssigned:293, ConsumerCoordinator (org.apache.kafka.clients.consumer.internals)
onJoinComplete:430, ConsumerCoordinator (org.apache.kafka.clients.consumer.internals)
joinGroupIfNeeded:440, AbstractCoordinator (org.apache.kafka.clients.consumer.internals)
ensureActiveGroup:359, AbstractCoordinator (org.apache.kafka.clients.consumer.internals)
poll:513, ConsumerCoordinator (org.apache.kafka.clients.consumer.internals)
updateAssignmentMetadataIfNeeded:1268, KafkaConsumer (org.apache.kafka.clients.consumer)
poll:1230, KafkaConsumer (org.apache.kafka.clients.consumer)
poll:1210, KafkaConsumer (org.apache.kafka.clients.consumer)
doPoll:1271, KafkaMessageListenerContainer$ListenerConsumer (org.springframework.kafka.listener)

The second question is why the backoff setting does not take effect and retry is being performed continuously as long as timeout occurs? In KafkaMessageListenerContainer.ListenerConsumer 's run method, there is a general error handling block where it still delegates to org.springframework.kafka.listener.SeekToCurrentErrorHandler to handle the exception. However, the key difference from the previous case is that there is no consumer record passed to the errorHandler. See code snipet below:

public void run() { 
while (isRunning()) {
try {
pollAndInvoke();
}
catch (Exception e) {
handleConsumerException(e);
}
}
}

protected void handleConsumerException(Exception e) {
if (e instanceof RetriableCommitFailedException) {
this.logger.error(e, "Commit retries exhausted");
return;
}
try {
if (!this.isBatchListener && this.errorHandler != null) {
this.errorHandler.handle(e, Collections.emptyList(), this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
else if (this.isBatchListener && this.batchErrorHandler != null) {
this.batchErrorHandler.handle(e, new ConsumerRecords<K, V>(Collections.emptyMap()), this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
else {
this.logger.error(e, "Consumer exception");
}
}
catch (Exception ex) {
this.logger.error(ex, "Consumer exception");
}
}

Consequently, org.springframework.kafka.listener.SeekUtils.seekOrRecover method will throw an exception directly. 

public static void seekOrRecover(Exception thrownException, List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container, boolean commitRecovered,
BiPredicate<ConsumerRecord<?, ?>, Exception> skipPredicate, LogAccessor logger, Level level) {

if (ObjectUtils.isEmpty(records)) {
if (thrownException instanceof SerializationException) {
throw new IllegalStateException("This error handler cannot process 'SerializationException's directly; "
+ "please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key "
+ "deserializer", thrownException);
}
else {
throw new IllegalStateException("This error handler cannot process '"
+ thrownException.getClass().getName()
+ "'s; no record information is available", thrownException);
}
}
}

 Currently, i did not find any good way to controll the retry behavior of this case. Changing value of the parameter "max.poll.interval.ms" might work, but if you cannot have a good estimation of the upper bound of you message handler's execution time, this is still not a good remedy. 


举报

相关推荐

0 条评论