0
点赞
收藏
分享

微信扫一扫

面试又来到了Kafka

忍禁 1天前 阅读 1

Kafka 是一种高性能、分布式、可扩展的消息队列系统。目标是为了解决大规模实时数据流处理和消息传递的问题。多分区、多副本且基于ZooKeeper协调的分布式消息系统。

主要特点:1、高吞吐量:能够处理非常高的消息吞吐量,每秒数百万条消息。

2、持久性:将消息持久化存储在磁盘上,因此数据不会丢失。

3、分布式: Kafka 是一个分布式系统,可以通过横向扩展来增加容量和吞吐量。

4、多副本复制: Kafka 支持多副本复制,确保数据的高可用性和容错性。

5、实时流处理: Kafka 的设计使其非常适合实时数据流处理场景,如日志收集、事件处理、指标监控等。

主要组成:

生产者(Producer):消息生产者,就是向 kafka broker 发消息的客户端,可以向多个主题发送消息。

消费者(Consumer):消息消费者,向 kafka broker 取消息的客户端,可以订阅多个主题的消息。

消费者组:由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

Broker :Kafka 集群中服务器节点。每个 Broker 负责管理多个分区和副本,并接收来自生产者的消息并为消费者提供消息。多个 Broker 组成一个 Kafka 集群,形成高可用性的数据存储和处理平台。

主题(topic) :主题是消息的逻辑分类。在 Kafka 中,消息被发布到一个或多个主题中。每个主题可以有多个分区,每个分区在不同的 Broker 上进行副本复制,以实现负载均衡和故障容错。

分区(Partition):分区是主题的物理划分,用于实现消息在集群中的并行处理。每个主题可以被划分成多个分区,每个分区存储着一部分数据。分区使得 Kafka 集群能够处理大量数据并支持水平扩展。

副本(Replica):副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,由 一个 leader 和若干个 follower组成。

leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。

follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据 的同步。leader 发生故障时,某个 follower 会成为新的 leader

ZooKeeper:开源的分布式协调服务,通常与Kafka一起使用来管理Broker和Topic的元数据。用于维护配置信息、提供分布式锁服务、协调Leader选举等。在Kafka集群中,它帮助管理Broker的注册和发现,以及Topic的配置信息。

副本的工作机制也很简单:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。

Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

SpringBoot集成kafuka:

引入依赖:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

配置kafuka:

server.port=1008
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=10.10.0.18:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50

# 设置消息的自定义分区策略
spring.kafka.producer.properties.partitioner.class=com.shepherd.kafka.partition.CustomizePartitioner

生产者:

package com.shepherd.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author fjzheng
 * @version 1.0
 * @date 2022/1/24 18:10
 */
@RestController
@RequestMapping("/api/kafka/produce")
public class ProducerController {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     *  Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,
     *  所以成功与否不确定,不带回调的发送消息是不能保证消息成功发送的,最终可能导致消息丢失。
     * @param message
     */
    @GetMapping("/{message}")
    public void sendMessageNoCallback(@PathVariable("message") String message) {
        kafkaTemplate.send("topic1", message);
    }

    /**
     *
     * @param message
     */
    @GetMapping("/callback1/{message}")
    public void sendMessage2(@PathVariable("message") String message) {
        kafkaTemplate.send("topic1", message).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });
    }

    @GetMapping("/callback2/{message}")
    public void sendMessage3(@PathVariable("message") String message) {
        kafkaTemplate.send("topic1", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:"+ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }
}

消费者:

@Component
public class KafkaConsumerConfig {
    // 消费监听
    @KafkaListener(topics = {"topic1"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}

举报

相关推荐

0 条评论