0
点赞
收藏
分享

微信扫一扫

Kafka消费者 消息丢失、重复消费问题解决方案

九月的栩 2022-04-23 阅读 78
kafka

第一版:

解决思路:当消费者消费完一条消息后,就提交一次消息偏移量,然后业务处理需要保持幂等性。

这种效率也许不高、但是安全、等发现了更好的解决方案再来思考记录

代码实现


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author SUN
 * @date 22/04/2022
 */
public class ConsumerTest {

    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    public static final AtomicBoolean running = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 取消自动提交
        properties.put("enable.auto.commit", false);
        // 所属组名称
        properties.put( "group.id", groupId);
        properties.put("client.id", "consumer.client.id.demo");
        return properties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(List.of(topic));

        while (running.get()) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

                // 处理业务:需保证 幂等性
                handler(consumerRecord);

                // 消费完后同步提交
                kafkaConsumer.commitSync();
            }

        }
        kafkaConsumer.close();
    }

    public static void handler(ConsumerRecord<String, String> consumerRecord){
        String topic = consumerRecord.topic();
        int partition = consumerRecord.partition();
        String value = consumerRecord.value();
        String key = consumerRecord.key();
    }
举报

相关推荐

0 条评论