Kafka 是一种高性能、分布式、可扩展的消息队列系统。目标是为了解决大规模实时数据流处理和消息传递的问题。多分区、多副本且基于ZooKeeper协调的分布式消息系统。
主要特点:1、高吞吐量:能够处理非常高的消息吞吐量,每秒数百万条消息。
2、持久性:将消息持久化存储在磁盘上,因此数据不会丢失。
3、分布式: Kafka 是一个分布式系统,可以通过横向扩展来增加容量和吞吐量。
4、多副本复制: Kafka 支持多副本复制,确保数据的高可用性和容错性。
5、实时流处理: Kafka 的设计使其非常适合实时数据流处理场景,如日志收集、事件处理、指标监控等。
主要组成:
生产者(Producer):消息生产者,就是向 kafka broker 发消息的客户端,可以向多个主题发送消息。
消费者(Consumer):消息消费者,向 kafka broker 取消息的客户端,可以订阅多个主题的消息。
消费者组:由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
Broker :Kafka 集群中的服务器节点。每个 Broker 负责管理多个分区和副本,并接收来自生产者的消息并为消费者提供消息。多个 Broker 组成一个 Kafka 集群,形成高可用性的数据存储和处理平台。
主题(topic) :主题是消息的逻辑分类。在 Kafka 中,消息被发布到一个或多个主题中。每个主题可以有多个分区,每个分区在不同的 Broker 上进行副本复制,以实现负载均衡和故障容错。
分区(Partition):分区是主题的物理划分,用于实现消息在集群中的并行处理。每个主题可以被划分成多个分区,每个分区存储着一部分数据。分区使得 Kafka 集群能够处理大量数据并支持水平扩展。
副本(Replica):副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,由 一个 leader 和若干个 follower组成。
leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据 的同步。leader 发生故障时,某个 follower 会成为新的 leader
ZooKeeper:开源的分布式协调服务,通常与Kafka一起使用来管理Broker和Topic的元数据。用于维护配置信息、提供分布式锁服务、协调Leader选举等。在Kafka集群中,它帮助管理Broker的注册和发现,以及Topic的配置信息。
副本的工作机制也很简单:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。
Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。
SpringBoot集成kafuka:
引入依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置kafuka:
server.port=1008
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=10.10.0.18:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50
# 设置消息的自定义分区策略
spring.kafka.producer.properties.partitioner.class=com.shepherd.kafka.partition.CustomizePartitioner
生产者:
package com.shepherd.kafka.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author fjzheng
* @version 1.0
* @date 2022/1/24 18:10
*/
@RestController
@RequestMapping("/api/kafka/produce")
public class ProducerController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,
* 所以成功与否不确定,不带回调的发送消息是不能保证消息成功发送的,最终可能导致消息丢失。
* @param message
*/
@GetMapping("/{message}")
public void sendMessageNoCallback(@PathVariable("message") String message) {
kafkaTemplate.send("topic1", message);
}
/**
*
* @param message
*/
@GetMapping("/callback1/{message}")
public void sendMessage2(@PathVariable("message") String message) {
kafkaTemplate.send("topic1", message).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
@GetMapping("/callback2/{message}")
public void sendMessage3(@PathVariable("message") String message) {
kafkaTemplate.send("topic1", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
}
消费者:
@Component
public class KafkaConsumerConfig {
// 消费监听
@KafkaListener(topics = {"topic1"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}