0
点赞
收藏
分享

微信扫一扫

Kafka与zookeeper集群保证数据可靠、有序、配置负载均衡

Kafka应答ack 模式

保证数据可靠

如果分区副本设置为1个,或 者ISR里应答的最小副本数量 ( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一 样的,仍然有丢数的风险(leader:0,isr:0)。

• 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

代码实现

public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //属性配置
        Properties properties = new Properties();
        //连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.6.101:9092");
        //指定k、v序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 设置 acks
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        // 重试次数 retries,默认是 int 最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        //创建生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //像first主题发送数据
        kafkaProducer.send(new ProducerRecord<>("first", 1,"","lzq"),new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e==null){
                    System.out.println("发送成功,主题"+recordMetadata.topic()+"分区"+recordMetadata.partition());
                }
            }
        }).get();
        //关闭资源
        kafkaProducer.close();
    }
}

数据重复问题

发送数据时,leader接收到,像follower同步,在应答时挂掉了,此时会重新选主,消费者再次发送数据,这时候数据就重复了 

幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。 精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)

重复数据的判断标准:具有相同主键的消息提交时,Broker只会持久化一条。其 中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。 所以幂等性只能保证的是在单分区单会话内不重复。

开启参数 enable.idempotence 默认为 true,false 关闭

运行过程

 代码实现

public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //属性配置
        Properties properties = new Properties();
        //连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.6.101:9092");
        //指定k、v序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //关联自定义分区
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.lzq.producer.MyPartitioner");
        // 设置 acks
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        // 重试次数 retries,默认是 int 最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        //指定事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"t1");
        //创建生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //初始化、开启时候
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        try {
            //像first主题发送数据
            kafkaProducer.send(new ProducerRecord<>("first", 1,"","lzq"),new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        System.out.println("发送成功,主题"+recordMetadata.topic()+"分区"+recordMetadata.partition());
                    }
                }
            });
            //提交事务
            kafkaProducer.commitTransaction();
        }catch (Exception e){
            //回滚事务
            kafkaProducer.abortTransaction();
        }finally {
            //关闭资源
            kafkaProducer.close();
        }
    }
}

数据乱序

负载均衡

创建一个josn文件vim topics-to-move.json

生成一个负载均衡的计划

会自动生成负载均衡计划

在创建一个josn文件,复制对应计划

执行计划

验证计划

退役旧节点

重新生成执行计划

启动脚本

举报

相关推荐

0 条评论