0
点赞
收藏
分享

微信扫一扫

KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)


文章目录

  • ​​1. 技术选型​​
  • ​​2. 导入依赖​​
  • ​​3. kafka配置​​
  • ​​4. 生产者(同步)​​
  • ​​5. 生产者(异步)​​
  • ​​6. 消费者​​
1. 技术选型

软件/框架

版本

jdk

1.8.0_202

springboot

2.5.4

kafka server

kafka_2.12-2.8.0

kafka client

2.7.1

zookeeper

3.7.0

2. 导入依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

3. kafka配置

properties版本

spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092

# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多方多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432

# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交时间间隔
spring.kafka.consumer.auto-commit-interval=1000

yml版本

server:
port: 8080
spring:
application:
name: springboot-kafka
kafka:
bootstrap-servers: 192.168.92.104:9092
consumer:
auto-commit-interval: 1000
auto-offset-reset: earliest
enable-auto-commit: true
group-id: springboot-consumer-02
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
batch-size: 16384
buffer-memory: 33544432

4. 生产者(同步)

package com.gblfy.demo.controller;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@RestController
public class KafkaSyncController {
private final static Logger log = LoggerFactory.getLogger(KafkaSyncController.class);

@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;


@RequestMapping("/send/sync/{message}")
public String send(@PathVariable String message) {

//同步发送消息
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 0, message);
try {
SendResult<Integer, String> sendResult = future.get();
RecordMetadata metadata = sendResult.getRecordMetadata();
log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());
// System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "success";
}

}

5. 生产者(异步)

package com.gblfy.demo.controller;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaAsyncController {
private final static Logger log = LoggerFactory.getLogger(KafkaAsyncController.class);

@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;

//设置回调函数,异步等待broker端的返回结束
@RequestMapping("/send/async/{message}")
public String sendAsync(@PathVariable String message) {

//同步发送消息
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 1, message);
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable e) {
log.info("发送消息失败: {}", e.getMessage());
}

@Override
public void onSuccess(SendResult<Integer, String> result) {
RecordMetadata metadata = result.getRecordMetadata();
log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());
}
});
return "success";
}
}

6. 消费者

package com.gblfy.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {
private final static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

@KafkaListener(topics = {"topic-springboot-01"})
public void onMessage(ConsumerRecord<Integer, String> record) {
log.info("消费者接收到消息主题:{} ,消息的分区:{} ,消息偏移量:{} ,消息key: {} ,消息values:{} ",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}


举报

相关推荐

0 条评论