文章目录
- 1. 技术选型
- 2. 导入依赖
- 3. kafka配置
- 4. 生产者(同步)
- 5. 生产者(异步)
- 6. 消费者
1. 技术选型
软件/框架 | 版本 |
jdk | 1.8.0_202 |
springboot | 2.5.4 |
kafka server | kafka_2.12-2.8.0 |
kafka client | 2.7.1 |
zookeeper | 3.7.0 |
2. 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3. kafka配置
properties版本
spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092
# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多方多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432
# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交时间间隔
spring.kafka.consumer.auto-commit-interval=1000
yml版本
server:
port: 8080
spring:
application:
name: springboot-kafka
kafka:
bootstrap-servers: 192.168.92.104:9092
consumer:
auto-commit-interval: 1000
auto-offset-reset: earliest
enable-auto-commit: true
group-id: springboot-consumer-02
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
batch-size: 16384
buffer-memory: 33544432
4. 生产者(同步)
package com.gblfy.demo.controller;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
@RestController
public class KafkaSyncController {
private final static Logger log = LoggerFactory.getLogger(KafkaSyncController.class);
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;
@RequestMapping("/send/sync/{message}")
public String send(@PathVariable String message) {
//同步发送消息
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 0, message);
try {
SendResult<Integer, String> sendResult = future.get();
RecordMetadata metadata = sendResult.getRecordMetadata();
log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());
// System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "success";
}
}
5. 生产者(异步)
package com.gblfy.demo.controller;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaAsyncController {
private final static Logger log = LoggerFactory.getLogger(KafkaAsyncController.class);
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;
//设置回调函数,异步等待broker端的返回结束
@RequestMapping("/send/async/{message}")
public String sendAsync(@PathVariable String message) {
//同步发送消息
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 1, message);
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable e) {
log.info("发送消息失败: {}", e.getMessage());
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
RecordMetadata metadata = result.getRecordMetadata();
log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());
}
});
return "success";
}
}
6. 消费者
package com.gblfy.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
private final static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = {"topic-springboot-01"})
public void onMessage(ConsumerRecord<Integer, String> record) {
log.info("消费者接收到消息主题:{} ,消息的分区:{} ,消息偏移量:{} ,消息key: {} ,消息values:{} ",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}