0
点赞
收藏
分享

微信扫一扫

SpringBoot--实战开发--整合Kafka(六十三)

亿奇学 2021-09-21 阅读 44

一、Kafka简介

  Kafka是一个分布式消息队列。★Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

二、Maven依赖

<!--kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

三、配置

application.properties

# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.77.132:9092
# 生产者序列化
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.linger.ms=1
# 消费者配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100ms
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.session.timeout.ms=15000
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-hello-group

四、简单测试

  1. 控制器
/**
 * Kafka消息队列测试
 */
@RestController
@Slf4j
public class KafkaController {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 生成消息
     */
    @GetMapping("send")
    public void produce() {
        kafkaTemplate.send("test_topic", "kafka消费测试");
        log.info("生产者消息发送成功");
    }

    /**
     * 消费消息
     * @param record
     */
    @KafkaListener(topics = "test_topic")
    public void listen(ConsumerRecord<?, ?> record) {
        log.info("topic = {}, offset = {}, value = {} \n", record.topic(), record.offset(), record.value());
    }
}

  1. 测试
    http://localhost:8081/send


    通过Kafka管理器查看:
    http://192.168.77.132:8080


常见问题:

  1. kafka启动报错[could not be established. Broker may not be available.]
    将配置中的localhost改为IP地址。
listeners=PLAINTEXT://192.168.77.132:9092
举报

相关推荐

0 条评论