消息中间件之kakfa生产者
- 二、异步发送消息
- 三、同步发送消息
- 四、异步回调
- 五、消息传递保障
- 六、`Partition`负载均衡
@Bean
public KafkaProducer<String, String> kafkaProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.26:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 消息确认机制配置
properties.put(ProducerConfig.RETRIES_CONFIG, "0"); // 重试
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); // 批次大小
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); // 多长时间发送一个批次
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); // 缓冲
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 序列化
return new KafkaProducer<>(properties);
}
二、异步发送消息
现在控制器中注入我们上面的bean
@Autowired
private KafkaProducer<String, String> kafkaProducer;
接着我们在服务器中执行:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.26:9092 --topic long-topic --from-beginning
这个将去接受我们一会发送消息。监听的Topic是long-topic
。
@GetMapping("/send")
public String sendMsg(@RequestParam("topicName") String topicName, @RequestParam(value = "num", defaultValue = "5") Integer num) {
for (int i = 0; i < num; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "id-" + i, "这是第" + i + "条消息");
kafkaProducer.send(record);
}
return "success";
}
启动项目,请求接口,我们在控制台将会看到如下的输出:
这说明我们已经成功了。
为什么说是异步发送呢?我们看一下send
这个函数:
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
返回的是Future
这应该就可以说明问题了吧!
三、同步发送消息
其实说是同步,但是相当于还是异步,只是使用get
方法,阻塞了一下,正确的说应该是异步阻塞发送消息,下面看例子演示:
@GetMapping("/send2")
public Map<String, String> sendMsg2(@RequestParam("topicName") String topicName, @RequestParam(value = "num", defaultValue = "5") Integer num) throws ExecutionException, InterruptedException {
Map<String, String> res = new HashMap<>(num);
for (int i = 0; i < num; i++) {
String key = "id-" + i;
String val = "这是第" + i + "条同步消息";
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, val);
Future<RecordMetadata> future = kafkaProducer.send(record);
RecordMetadata metadata = future.get();
res.put(key, metadata.toString() + "发送成功");
}
return res;
}
通过postman
测试接口,返回如下:
在服务器的控制台我们也收到了,发送的消息:
四、异步回调
使用异步回调,在回调中处理一下我们的业务。这个也很简单!
@GetMapping("/callback")
public Map<String, String> sendCallBack(@RequestParam("topicName") String topicName, @RequestParam(value = "num", defaultValue = "5") Integer num) throws ExecutionException, InterruptedException {
Map<String, String> res = new HashMap<>(num);
for (int i = 0; i < num; i++) {
String key = "id-" + i;
String val = "这是第" + i + "条同步消息";
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, val);
Future<RecordMetadata> future = kafkaProducer.send(record, (metadata, exception) -> {
res.put(key, metadata.toString() + "发送成功");
});
}
return res;
}
五、消息传递保障
在kafka
里面提供三种消息传递保障:
- 最多一次(收到0到1次)只发不收
- 至少一次(收到1到多次)
- 正好一次(有且仅有一次)消耗资源最多
消息传递保障依赖于producer
和Consumer
共同实现,主要是依赖于producer
。
我们在上面注入的bean
里面有设置:
properties.put(ProducerConfig.ACKS_CONFIG, "all");
当producer
需要server
接收到数据之后发出的确认接受的信号,这个配置就是指producer
需要多少个这样的确认信号。
这个ProducerConfig.ACKS_CONFIG
一共主要有三个选项:acks=0
、acks=1
和acks=all
。
-
acks=0
:设置为0表示producer
不需要等待任何确认消息。副本立即加到socket buffer
并认为已经发送。没有任何保障可以保证次情况下server
已经成功接受数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset
会总是设置为-1。 -
acks=1
:这意味着至少要等待leader
已经成功将数据写入本地的log,但是并没有等待所有的follower
是否成功写入。这种情况下,如果follower
没有成功备份数据,而此时leader
又挂掉,则消息会丢失。 -
acks=all
:这意味着leader
需要等待所有备份都成功写入日志,这种策略会保证只有一个备份存活就不会丢失数据。这是最强的保证。
六、Partition
负载均衡
在上一节中我们可以看到topic
的描述信息,一个topic
包含多个partition
,一般的来说多个partition
是随机的选择,下面看一下如何自定义选择partition
的策略
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @author long
*/
public class LoadBalancingPartition implements Partitioner {
/**
* 确认什么的key对应什么样的partition(具体业务,具体设置,这里只做演示)
* @param topic
* @param key
* @param keyBytes
* @param value
* @param valueBytes
* @param cluster
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int keyId = Integer.parseInt(key.toString().split("-")[1]);
return keyId % 2;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
这个自定义partition
负载均衡的是和我们具体业务有关。