消息中间件之Kafka消费者
- 一、`Consumer`初始化配置
- 二、测试简单的消费消息
- 三、手动提交
- 四、`Consumer Group`
- 六、单个`Partition`提交
- 七、`Consumer`多线程并发处理
- 八、`Consumer`设置`offset`
- 九、`Consumer`限流操作
一、Consumer
初始化配置
/**
* producer 配置
* @return
*/
@Bean
public KafkaProducer<String, String> kafkaProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.26:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<>(properties);
}
二、测试简单的消费消息
这里为了不需要等待,我们异步的开启一个线程去消费消息。
/**
* @author long
*/
@Slf4j
@RestController
@RequestMapping("/consumer")
public class ConsumerController {
@Autowired
private KafkaConsumer kafkaConsumer;
/**
* 简单消费者
*/
@GetMapping("/simple")
public String simple(@RequestParam("topic_name") String topicName) {
kafkaConsumer.subscribe(Arrays.asList(topicName));
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
log.info("消费信息:patition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
}
}
}, "获取消息线程 => ").start();
return "success";
}
}
启动项目之后,先访问localhost:8080/consumer/simple?topic_name=new_long_topic_34
这个接口,让这个异步的线程启动起来。
这时运行发送消息的接口:localhost:8080/producer/send2?topicName=new_long_topic_34&num=100
,发送一百条消息。
我们上面的这种消费方法,使用的是自动提交的方法。实际中使用不推荐使用。
三、手动提交
/**
* 手动提交
* @param topicName
* @return
*/
@GetMapping("/commit")
public String commit(@RequestParam("topic_name") String topicName) {
kafkaConsumer.subscribe(Arrays.asList(topicName));
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
// TODO 处理数据保存到数据库
log.info("消费信息入库:patition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
// TODO 处理失败的话,则回滚,不要提交offset
}
// TODO 成功的话,手动提交offset
kafkaConsumer.commitAsync();
}
}, "获取消息线程 => ").start();
return "success";
}
四、Consumer Group
这里面我们需要注意:
- 单个分区的消息只能由
ConsumerGroup
中某个Consumer
消费 -
ConsumerGroup
中的一个Consumer
可以对应多个分片 - 一个分片不可以被
ConsumerGroup
中的多个Consumer
消费,多出的只能闲置 -
Consumer
从Partition
中消费消息是顺序,默认从开头开始消费 - 单个
ConsumerGroup
会消费所有的Partition
中的消息。
六、单个Partition
提交
可以控制单个partition
手动提交,方便使用多线程进行消息消费;并且可以对多个partition
提交进行控制。
@GetMapping("/single")
public String singlePartition(@RequestParam("topic_name") String topicName) {
kafkaConsumer.subscribe(Arrays.asList(topicName));
new Thread(() -> {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
}
long offset = recordList.get(recordList.size() - 1).offset();
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
kafkaConsumer.commitSync(offsetMap);
}
}, "获取消息线程<single> => ").start();
return "success";
}
针对上面的代码,可以发现对于只包含一个partition
的时候,上面的写法是有点啰嗦的,我们还有另一种写法:
@GetMapping("/single_2")
public String singlePartition2(@RequestParam("topic_name") String topicName) {
TopicPartition partition = new TopicPartition(topicName, 0);
// 订阅topic中的某一个partition
kafkaConsumer.assign(Arrays.asList(partition));
new Thread(() -> {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
}
long offset = recordList.get(recordList.size() - 1).offset();
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
kafkaConsumer.commitSync(offsetMap);
}, "获取消息线程<single> => ").start();
return "success";
}
七、Consumer
多线程并发处理
关于多线程并发处理,常用的是这样两种线程模型:
第一种是:对于数据进行异步处理,适用于对于数据一致性要求不高,不是用于处理业务,这种情况就是为了快速消费,不管是不是成功,偏向于日志这种的;
第二种是:一个partition
对应一个consumer
,多用户处理业务使用。
在上面使用异步线程也是一种多线程使用方式,另一种大家也能猜到肯定是使用线程池(推荐)。
@Bean("longThreadPool")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
// 核心线程数
poolTaskExecutor.setCorePoolSize(10);
// 最大线程数
poolTaskExecutor.setMaxPoolSize(15);
// 缓冲对列
poolTaskExecutor.setQueueCapacity(100);
// 允许线程空闲时间60s
poolTaskExecutor.setKeepAliveSeconds(60);
// 线程池前缀
poolTaskExecutor.setThreadNamePrefix("consumer-pool");
// 线程池对拒绝任务的处理策略
poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 关闭线程池的时候,是否等待当前任务执行完成
poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
// 等待当前任务完成的超时时间60秒,否则会造成阻塞
poolTaskExecutor.setAwaitTerminationSeconds(60);
poolTaskExecutor.initialize();
return poolTaskExecutor;
}
使用直接注入就可以:
@Autowired
private ThreadPoolTaskExecutor longThreadPool;
可以使用@Async
的进行处理,不做演示了,下面演示下对第一种模型的处理:
处理consumer
的消费消息
@GetMapping("/simple")
public String simple(@RequestParam("topic_name") String topicName) {
kafkaConsumer.subscribe(Arrays.asList(topicName));
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
// 对提交的数据进行处理
longThreadPool.execute(() -> {
log.info("消费信息:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
});
}
}
}, "获取消息线程 => ").start();
return "success";
}
八、Consumer
设置offset
consumer
提供了一个seek
的函数,可以设置我们开始的offset
位置开始消费。设置offset
:多用于回滚和重复消费。
我们本地的消费的最后offset
是399
,测试的我们从350
开始。
@GetMapping("/single_2")
public String singlePartition2(@RequestParam("topic_name") String topicName) {
TopicPartition partition = new TopicPartition(topicName, 0);
// 订阅topic中的某一个partition
kafkaConsumer.assign(Arrays.asList(partition));
new Thread(() -> {
kafkaConsumer.seek(partition, 350);
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
}
long offset = recordList.get(recordList.size() - 1).offset();
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
kafkaConsumer.commitSync(offsetMap);
}, "获取消息线程<single> => ").start();
return "success";
}
从这里我们也可以知道,kafka
是不会丢弃消息的。
九、Consumer
限流操作
一般情况,系统不会给kafka
客户端,提供太多的资源,有时候会出现数据峰值,把kafka
打死,所以这个时候限流就很重要了。
一般来说:当处理的数据量达到某个阈值时暂停消费,低于阈值时则恢复消费,这就可以让Consumer
保持一定的速率去消费数据,从而避免流量剧增时将Consumer
给压垮。
这里我们利用Guava
的限流器对Consumer
进行限流。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
代码实现:
@GetMapping("/rate_limiter")
public String rateLimiter(@RequestParam("topic_name") String topicName) {
TopicPartition p0 = new TopicPartition(topicName, 0);
TopicPartition p1 = new TopicPartition(topicName, 1);
kafkaConsumer.assign(Arrays.asList(p0, p1));
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10));
if (records.isEmpty()) {
continue;
}
if (!rateLimiter.tryAcquire()) {
log.warn("无法拿到令牌,开始限流...");
kafkaConsumer.pause(Arrays.asList(p0, p1));
} else {
log.info("拿到令牌,开始消费...");
kafkaConsumer.resume(Arrays.asList(p0, p1));
}
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
}
long offset = recordList.get(recordList.size() - 1).offset();
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
kafkaConsumer.commitSync(offsetMap);
}
}
}, "限流器线程 => ").start();
return "success";
}