0
点赞
收藏
分享

微信扫一扫

Kafka集群如何实现相互感知

上一篇 <<<Kafka的设计原理介绍
下一篇 >>>Kafka如何实现分区及指定分区消费


命令操作

./bin/kafka-topics.sh --create --zookeeper 10.211.55.16:2181 --replication-factor 2 --partitions 3 --topic test1221
./bin/kafka-topics.sh --describe --zookeeper 10.211.55.16:2181 --topic test1220
./bin/kafka-console-producer.sh --broker-list 10.211.55.16:9092 --topic test1220
./bin/kafka-console-consumer.sh --bootstrap-server 10.211.55.16:9092 --topic test1220 --from-beginning

代码操作

# kafka
spring:
  kafka:
    # kafka服务器地址(可以多个)
    bootstrap-servers: 10.211.55.16:9092,10.211.55.23:9092,10.211.55.24:9092
    consumer:
      # 指定一个默认的组名
      group-id: kafka2
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
      # 服务器地址
      bootstrap-servers: 10.211.55.16:9092,10.211.55.23:9092,10.211.55.24:9092

@Component
public class UserLogProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;
 
    /**
     * 发送数据
     * @param userid
     */
    public void sendLog(String userid){
        UserLog userLog = new UserLog();
        userLog.setUsername("jhp").setUserid(userid).setState("0");
        System.err.println("发送用户日志数据:"+userLog);
        kafkaTemplate.send("test1221", JSON.toJSONString(userLog));
    }
}

@Component
@Slf4j
public class UserLogConsumer {

    @KafkaListener(topics = {"test1221"})
    public void consumer(ConsumerRecord<?,?> consumer){

        System.out.println("topic名称:" + consumer.topic() + ",key:" +
                consumer.key() + "," +
                "分区位置:" + consumer.partition()
                + ", 下标" + consumer.offset());
    }
}

推荐阅读:
<<<消息中间件的核心思想
<<<消息中间件常见问题汇总
<<<基于Netty简单手写消息中间件思路
<<<消息队列常用名词与中间件对比
<<<Rabbitmq基础知识
<<<Rabbitmq示例之点对点简单队列
<<<Rabbitmq示例之工作(公平)队列
<<<Rabbitmq示例之发布订阅模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq示例之RPC模式
<<<Rabbitmq队列模式总结
<<<Rabbitmq如何保证消息不丢失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保证幂等性
<<<Rabbitmq的重试策略
<<<Rabbitmq通过死信队列实现过期监听
<<<Rabbitmq解决分布式事务思路
<<<Rabbitmq解决分布式事务demo
<<<Rabbitmq环境安装
<<<Kafka中的专业术语都有哪些
<<<Kafka的设计原理介绍
<<<Kafka如何实现分区及指定分区消费
<<<Kafka如何保证消息顺序消费
<<<Kafka如何保证高吞吐量
<<<Kafka集群环境搭建
<<<RocketMQ架构原理
<<<RocketMQ、RabbitMQ和Kafka的对比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保证顺序消费demo
<<<RocketMQ如何动态扩容和缩容
<<<RocketMQ如何解决分布式事务
<<<RocketMQ单机版本安装
<<<RocketMQ集群环境程序启用相关知识点
<<<RocketMQ单机做主备实操
<<<RocketMQ所有配置说明

举报

相关推荐

0 条评论