0
点赞
收藏
分享

微信扫一扫

查立得源码如何去除版权

陆公子521 03-25 13:30 阅读 2

前言

spring boot 集成kafka是比较简单的 直接引入spring-kafka的包 然后稍作配置即可

1. Spring Boot集成kafka

  • 添加 Kafka 依赖
    在 pom.xml 文件中添加 Kafka 依赖:

    
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
  • 配置 Kafka 属性
    在 application.properties 或 application.yml 文件中添加 Kafka 配置属性:

    # Kafka 服务器地址
    spring.kafka.bootstrap-servers=localhost:9092
    
    # 消费者配置
    spring.kafka.consumer.group-id=my-group
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    # 生产者配置
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
  • 创建测试消费者

    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics = "my-topic", groupId = "my-group")
        public void consume(String message) {
            System.out.println("Received message: " + message);
        }
    }
    

    这里创建了一个 KafkaConsumer 类,使用 @KafkaListener 注解来监听名为 “my-topic” 的主题。当收到消息时,consume 方法会被调用,并打印出收到的消息。

  • 创建 Kafka 生产者

    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaProducer {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void send(String topic, String message) {
            kafkaTemplate.send(topic, message);
        }
    }
    

    这里创建了一个 KafkaProducer 类,通过注入 KafkaTemplate 来发送消息。send 方法接受主题和消息作为参数,并将消息发送到指定的主题。

  • 测试代码

    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    public class KafkaTests {
    
        @Autowired
        private KafkaProducer producer;
    
        @Test
        public void testProduceAndConsume() throws InterruptedException {
            String topic = "my-topic";
            String message = "Hello, Kafka!";
    
            producer.send(topic, message);
    
            // 等待消息被消费
            Thread.sleep(5000);
        }
    }
    

官方地址: https://docs.spring.io/spring-kafka/docs/2.8.11/reference/html/#getting-started

2. kafka的鉴权协议

  • PLAINTEXT

    这是 Kafka 默认的协议,不提供任何认证机制。所有客户端都可以不经认证就连接到 Kafka 集群。
    在生产环境中,强烈不建议使用这种协议,因为它不提供任何安全保障。

  • SSL

    SSL 提供了一种加密通信的方式,可以保护客户端和服务器之间的数据传输。
    SSL 需要在 Kafka 代理(Broker)和客户端之间建立信任关系,通常是通过 SSL 证书来实现的。
    SSL 可以单独使用,也可以与其他认证协议(如 SASL)结合使用,提供更高级别的安全性。

  • SASL/PLAIN

    SASL/PLAIN 是一种基于用户名和密码的简单认证机制。
    客户端需要提供有效的用户名和密码才能连接到 Kafka 集群。
    SASL/PLAIN 通常与 SSL 协议结合使用,以防止明文传输用户名和密码。

  • SASL/SCRAM

    SASL/SCRAM 是一种更安全的基于挑战-响应的认证机制。
    SASL/SCRAM 支持两种变体:SCRAM-SHA-256 和 SCRAM-SHA-512。
    SCRAM 使用 HMAC 和随机数来对密码进行哈希计算,而不会在网络上传输明文密码。
    SCRAM 通常也与 SSL 结合使用,以提供更高的安全性。

  • SASL/GSSAPI (Kerberos)
    SASL/GSSAPI 是基于 Kerberos 的认证协议。
    Kerberos 是一种广泛使用的网络认证协议,提供了集中式的认证和授权服务。
    Kerberos 使用票据(Ticket)系统来验证客户端身份,而不是直接传输密码。
    Kerberos 认证通常被认为是企业级环境中最安全的认证方式之一。

  • SASL/OAUTHBEARER
    SASL/OAUTHBEARER 是一种使用 OAuth 2.0 访问令牌(Access Token)进行认证的协议。
    OAuth 2.0 是一种广泛使用的授权框架,常用于授权第三方应用访问用户数据。
    SASL/OAUTHBEARER 需要客户端获取有效的 OAuth 2.0 访问令牌,并在连接 Kafka 时提供该令牌进行认证。

3. 连接 开放 SASL/PLAIN的kafka

kafka的server端开放了 SASL_PLAINTEXT 认证 同时使用了 SCRAM-SHA-256

yaml配置

spring:
  application:
    name: xxxx
  kafka:
    bootstrap-servers: 192.168.xxxx:9092,192.168.xxx:9092,192.168.xxx:9092
    security:
      #指定安全协议
      protocol: SASL_PLAINTEXT
    properties:
      # SCRAM 加密方法 256还是512
      sasl.mechanism: SCRAM-SHA-256
      # username  password 为在server端新建的用户 
      ## 注意 最后边有个;  
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";

    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

配置好后 在使用 测试类进行验证
会出现如下log 就是配置成功了

2024-03-21 15:59:20.306  INFO [recommend,,] 84510 --- [           main] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
2024-03-21 15:59:20.338  INFO [recommend,,] 84510 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2024-03-21 15:59:20.372  INFO [recommend,,] 84510 --- [           main] o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
2024-03-21 15:59:20.400  INFO [recommend,,] 84510 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.7.0
2024-03-21 15:59:20.400  INFO [recommend,,] 84510 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: xxxxx524ed625438c5
2024-03-21 15:59:20.401  INFO [recommend,,] 84510 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1711007960400

the end !!!

good day !!!

举报

相关推荐

0 条评论