配置
kafka:
bootstrap-servers: 101.34.177.108:9092 # 多个用英文逗号隔开
producer:
retries: 3 # 默认为 0,发送主题失败后重试的次数
batch-size: 100 # 默认为 0,批处理发送主题大小
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
listener:
ack-mode: MANUAL_IMMEDIATE # 手动提交模式
concurrency: 5 # 消费监听线程数,当配置值大于 Kafka 分区数,按分区数执行
poll-timeout: 5000 # 单次拉取消息的超时(毫秒)
missing-topics-fatal: false
consumer:
enable-auto-commit: false # 建议关闭自动提交 Offset,不然报错很难处理
auto-offset-reset: earliest
max-poll-records: 100 # 单次拉取最大记录数
group-id: item # 消费组,消费者多实例的情况下,配置同一个消费组,实例数不能超过 Topic 的分区数
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
pom依赖
<!-- kafaka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
相关工具类
/**
* @program: soap-item_v2
* @description: MessageObject
* @author: ZhangRiTian
* @create: 2021-11-15 16:20
*/
@Data
@ToString
public class MessageObject {
private Long id;
private Object msg;
private Date sendTime;
}
@Data
@ToString
public class Message {
private Long id;
private String msg;
private Date sendTime;
}
@Component
@Slf4j
public class KafkaSender {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
/**
* 发送消息给Kafaka对应的主题
* @param topic 主题
* @param msg 消息体
*/
public Boolean send(String topic , String msg) {
Message message = new Message();
try {
message.setId(System.currentTimeMillis());
message.setMsg(msg);
message.setSendTime(new Date());
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, msg);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
// 给发Kafaka主题,发送消息
kafkaTemplate.send(topic,gson.toJson(message));
log.info(topic+" - 生产者 发送消息失败:" + message.toString());
}
@Override
public void onSuccess(SendResult<String, String> stringObjectSendResult) {
// 给发Kafaka主题,发送消息
log.info(topic+" - 生产者 发送消息成功:" + message.toString());
}
});
} catch (Exception e) {
e.printStackTrace();
log.info(topic+" - 生产者 发送消息失败:", message.toString());
}
return true;
}
}
消费者类
package com.menglar.soap.item.common.lazada_kafaka_consumer;
import com.alibaba.fastjson.JSON;
import com.menglar.soap.item.service.LazadaItemService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* @program: soap-item_v2
* @description: kafaka消费者处理业务
* @author: ZhangRiTian
* @create: 2021-11-15 14:07
*/
@Component
@Slf4j
public class ItemCategoryConsumer {
@Autowired
private LazadaItemService lazadaItemService;
@Qualifier("threadPoolTaskExecutor")
@Autowired
private ThreadPoolTaskExecutor poolTaskExecutor;
/**
* 给商品设置默认的类目
*/
//@KafkaListener(topics = KafkaConstants.ITEM_CLAIM)
public void setItemRecommendCategory(ConsumerRecord<?, ?> record, Acknowledgment ack) {
log.info(record.topic()+" - 消费者 开始消费:" + record.value().toString());
try {
// 消费到批量设置类目的kafak主题数据处理业务,判断消费到的数据是否为空,不为空进行业务处理ok
if (record.value()!= null) {
Object value = record.value();
KafkaVO kafkaVO = JSON.parseObject((String) value, KafkaVO.class);
Long userId = kafkaVO.getUserId();
List<Long> itemList = kafkaVO.getItemList();
//开启多线程消费
for (Long itemId : itemList) {
CompletableFuture.runAsync(()-> lazadaItemService.setItemCategory(userId, itemId),poolTaskExecutor);
}
}
// 回调ack确认
ack.acknowledge();
log.info(record.topic()+" - 消费者 消费消息成功:" + record.value().toString());
} catch (Exception e) {
log.error(record.topic()+" - 消费者 消费消息失败:" + record.value().toString(),e.getStackTrace());
}
}
/**
* 子方法处理业务
*/
//
// @KafkaListener(topics = KafkaConstants.ITEM_CLAIM)
// public void operateMethod(ConsumerRecord<?, ?> record, Acknowledgment ack){
// log.info("开始监听");
// Object value = record.value();
// KafkaVO kafkaVO = JSON.parseObject((String) value, KafkaVO.class);
// Long userId = kafkaVO.getUserId();
// log.info("消费成功");
// }
}