0
点赞
收藏
分享

微信扫一扫

Go-Gin-Example 第九部分 实现redis缓存

一天清晨 04-04 18:30 阅读 1

【JAVA】生产环境kafka重复消费问题记录

问题描述

业务系统每周都有定时任务在跑,由于是大任务因此采用分而治之思想将其拆分为多个分片小任务采用kafka异步队列消费的形式来减少服务器压力,每个小任务都会调用后台的c++算法,调用完成之后便会回写数据库的成功次数。今天观测到定时任务的分片小任务存在被重复消费的问题,表现就在于数据库中存在多余的成功次数,比如本来大任务分了10个分片,但回写了25次任务处理成功,这是不合理的,因此问题的排查便开始了。

问题排查

一般这种"回写次数"大于"任务总数"的情况,除了代码逻辑问题之外,非常有可能是因为系统出现了重复消费的问题,多次的消费导致"执行成功回写"这块逻辑被执行了多次。
后台日志确实也显示,同样的消息被消费了多次:

2024-03-31 03:17:51.926	
[2024-03-31 03:17:51:674 CST] [] [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  [] 更新分片状态成功, processId: 16, shardId: 57
2024-03-31 03:17:51.675	
[2024-03-31 03:17:51:662 CST] [] [] to update shard status, processId: 16, shardId: 57


2024-03-31 01:24:10.898	
[2024-03-31 01:24:10:803 CST] [] [] 更新分片状态成功, processId: 16, shardId: 57
2024-03-31 01:24:09.395	
[2024-03-31 01:24:09:302 CST] [] [] to update shard status, processId: 16, shardId: 57

经过后台错误日志排查,发现后台报了许多如下异常:

[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatResponseHandler.handle(AbstractCoordinator.java:1054)]: 
[Consumer clientId=consumer-consumer_group_prod-6, groupId=consumer_group_prod] 
Attempt to heartbeat failed since group is rebalancing

通过日志可以判断,系统出现了 消费者重平衡 rebalance 的问题,一般消费者组进行重平衡,可能会是以下几种原因:

  • 消费者组引入了新topic
  • 消费者组有成员进组
  • 消费者组有成员离组

由于线上系统周末没有人去新增话题或者新加实例来增加消费者,因此我往 消费者异常离组这个方向进行排查。因为当消费者离组,并重平衡重新加入消费者组后,之前 未消费的消息又会被拉取一次,符合当前的表现。

此时我查看后台,确实存在消息堆积的现象,同时根据后台c++算法的日志,发现有许多分片在算法中pending了大概两三小时,有的甚至迟迟没跑出结果。并且,我检查了目前系统的kafka配置,其中有两项,就是导致重复消费的元凶:

spring.kafka.consumer.max-poll-interval-ms=7200000
spring.kafka.consumer.max-poll-records=5

这两项配置的含义是,消费者消费间隔最高为7200秒(即2小时),而一次性消费最多会拉取5条消息。

而代码里面的消费逻辑如下:

// 代码已脱敏处理
for (InputReq inputReq : reqList) {
    boolean shardSuccess = false;

    try {
        // 调用算法A
        algoBiz.callAlgoA(inputReq.getProcessId(), inputReq.getShardId(), inputReq.getProcVersion());

        // 调用算法B
        algoBiz.callAlgoB(inputReq.getProcessId(), inputReq.getShardId(),  inputReq.getProcVersion());

        shardSuccess = true ;
    }
    catch (Exception e) {
        log.error("[] error: {}", e.getMessage());
    }

    // 更新分片状态
    Boolean updateProcShard = inputReq.getToUpdateProcShard();
    algoBiz.updateShardResult(inputReq.getProcessId(), inputReq.getShardId(), updateProcShard, shardSuccess);
}

ack.acknowledge();

代码会消费一次性拉取的最多五条消息之后,才会ack队列协调者,此时如果五条消息处理的总时间超过两小时,那么就会触发消费组重平衡,该消费者重新入队。而结合这周由于后台算法更新,运行时间大大减慢,从而导致了消息重平衡而最终导致了重复消费。

处理方式

由于一时半会儿算法优化还没那么快上线,在调度模块我们就先通过配置进行优化,先解决头疼的重复消费问题。我们先确保程序逻辑超时时间足够长,同时单次消费仅拉取一条消息即可,这样可以大大减少由于算法运行过慢导致的重平衡问题。

spring.kafka.consumer.max-poll-interval-ms=10800000
spring.kafka.consumer.max-poll-records=1
举报

相关推荐

0 条评论