第一版:
解决思路:当消费者消费完一条消息后,就提交一次消息偏移量,然后业务处理需要保持幂等性。
这种效率也许不高、但是安全、等发现了更好的解决方案再来思考记录
代码实现
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author SUN
* @date 22/04/2022
*/
public class ConsumerTest {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static final AtomicBoolean running = new AtomicBoolean(true);
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 取消自动提交
properties.put("enable.auto.commit", false);
// 所属组名称
properties.put( "group.id", groupId);
properties.put("client.id", "consumer.client.id.demo");
return properties;
}
public static void main(String[] args) {
Properties properties = initConfig();
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(List.of(topic));
while (running.get()) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 处理业务:需保证 幂等性
handler(consumerRecord);
// 消费完后同步提交
kafkaConsumer.commitSync();
}
}
kafkaConsumer.close();
}
public static void handler(ConsumerRecord<String, String> consumerRecord){
String topic = consumerRecord.topic();
int partition = consumerRecord.partition();
String value = consumerRecord.value();
String key = consumerRecord.key();
}