0
点赞
收藏
分享

微信扫一扫

【RabbitMQ-5】-生产者保证消息安全(confirm配置)

1. 生产者不丢失数据

1.1 开启事务模式

RabbitTemplate代码:

  @Bean
    @Autowired
   public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
       RabbitTemplate rabbitTemplate = new RabbitTemplate();
       rabbitTemplate.setConnectionFactory(connectionFactory);
       rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
   }

(1)生产者发送消息,若是MQ服务器挂掉,那么程序会不断尝试重试,直至broker恢复,会重新接收这个消息。
(2)若是消费者在ack之前挂掉,MQ服务器会将这条消息恢复,若是长时间没有收到consumer端的确认消息,那么会将消息从unacked状态转化为ready状态。
(3)若是消费者处理消息期间抛出异常,MQ服务器会收到一个nack或者reject,MQ服务器也会恢复这条消息。

开启事务会大幅降低消息发送及接收效率,因为当已经有一个事务存在时,后面的消息是不能被发送或者接收(对同一个consumer而言)的。

1.2 confirm模式

为了producer端知道消息是否进入queue,可以使用confirmreturn来代替事务。

confirm和return的配置:

  //消息的确认机制(confirm);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);

confirm和return的区别和联系:

  • confirmCallBack:消息从生产者到达exchange时返回ack,消息未到达exchange返回nack
  • returnCallBack:消息进入exchange但未进入queue时会被调用。

官网对confirm和return的描述:

大概意思就是:在rabbit Template发送操作完成时,channels才会关闭。在连接工厂(ConnectionCache)满的情况下,缓存中有空间时,channel不会关闭,直到confirm/return处理完成。
使用confirm时,将在接受到ack时关闭channels;使用return时,通道会整整持续5s的时间。通常建议将Connection Cache设置足够大的值,以便发布的消息的channel可以返回连接池而不是关闭。当看到channel快速打开或者关闭时,应该考虑增加连接池大小以减少服务器的开销。

1.2.1 生产者消息丢失的情况

(1)rabbitmq由于短暂的网络异常,导致消息发送了出去,但是未到exchange,连接可以短时间恢复。
(2)rabbitmq服务器挂掉且长时间无法恢复,消息无法发送。

1.2.2 生产者消息发送失败(网络异常)

当消息发送失败,一般两种方式处理这个消息:

  1. 自动重发;
  2. 系统预警人工处理;

配置文件源码:

 rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
            }
        });
        rabbitTemplate.setReturnCallback(new ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });

我们可以看到,在ReturnCallback中,返回的参数是Message对象,我们可以获取消息内容exchangeroutingKey这些信息的。

但是在ConfirmCallback中,确是没有消息信息,只有一个correlationData[ˌkɒrəˈleɪʃn] 相关性的,并且我们看到他的日志,打印出来还是null

输出日志:

2019-03-10-00-52 [AMQP Connection 127.0.0.1:5672] [com.Configuration.MyAMQPConfig]
 [INFO] - 消息发送成功:correlationData(null),ack(false),cause
(channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no 
exchange 'exchange_direct_no' in vhost '/', class-id=60, method-id=40))

于是我们打开correlationData的源码:

可以看到里面的属性只有一个id

public class CorrelationData implements Correlation {
    private volatile String id;

    public CorrelationData() {
    }

    public CorrelationData(String id) {
        this.id = id;
    }

    public String getId() {
        return this.id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String toString() {
        return "CorrelationData [id=" + this.id + "]";
    }
}

于是我们打开RabbitTemplate send/convertAndSend方法的源码:

发现里面含有CorrelationData对象,很显然我们在发送消息的时候,消息信息和correnlationData.id属性进行了绑定,我们若是可以根据id拿到消息,那么就可以进行“重试”或者“预警”等操作了。

于是我们扩展correlationData类,将id和消息属性绑定起来。

public class CorrelationData extends 
org.springframework.amqp.rabbit.support.CorrelationData {
    //消息体
    private volatile Object message;
    //交换机
    private String exchange;
    //路由键
    private String routingKey;
    //重试次数
    private int retryCount = 0;

我们在发送消息的时候,可以发送correlationData扩展对象,在我们confirmack=false的情况下,于是我们就可以拿到消息主体了。

 @Test
    public void contextLoads() {
        Map<String, Object> map = new HashMap<>();
        Book book = new Book("西游记", "120.00");
        //使用继承扩展的CorrelationData 、id消息流水号
        CorrelationData correlationData = 
               new CorrelationData(UUID.randomUUID().toString()); 
        correlationData.setMessage(book);
        correlationData.setExchange("exchange_direct_no");
        correlationData.setRoutingKey("ord");
        try {
            rabbitTemplate.convertAndSend("exchange_direct_no", "ord", book, correlationData);
        } catch (AmqpConnectException e) {
            System.out.println("保存信息编号:" + correlationData);
        }
    }

现在我们可以拿到消息主体,也可以拿到rabbitTemplate,那么我们是否可以在confirm回调方法中再次重试?

ConfirmCallback回调函数:

public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
    private static final Logger logger = LoggerFactory.getLogger(MessageConfirmCallback.class);
    private RabbitTemplate rabbitTemplate;

    public MessageConfirmCallback(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack == true) {
            logger.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
        } else {
            if (correlationData instanceof com.Configuration.CorrelationData) {
                com.Configuration.CorrelationData messageCorrelationData = (com.Configuration.CorrelationData) correlationData;
                String exchange = messageCorrelationData.getExchange();
                Object message = messageCorrelationData.getMessage();
                String routingKey = messageCorrelationData.getRoutingKey();
                int retryCount = messageCorrelationData.getRetryCount();
                //重试次数+1
                ((com.Configuration.CorrelationData) correlationData).setRetryCount(retryCount + 1);
                rabbitTemplate.convertSendAndReceive(exchange, routingKey, message, correlationData);
            }
        }
    }
}

注意事项:
但是在主线程发送消息的过程中,rabbitMQf服务器关闭,这时候主程序和ConfirmCallback线程都会等待Connection恢复,然后重新启动rabbitmq,当应用程序重新建立connection之后,两个线程都会死锁

解决方案:
ack=fasle的情况下,可以将消息存到缓存中,定时发起任务重发。

1.2.2 生产者消息发送失败(MQ服务器异常)

对于这种情况,confirm是没有发送出去的,但是消息丢失怎么处理,但是会抛出AmqpConnectException异常,我们可以捕获该异常,然后将msgId也就是CorrelationData对象保存即可。

 @Test
    public void contextLoads() {
        Map<String, Object> map = new HashMap<>();
        Book book = new Book("西游记", "120.00");
        CorrelationData msgId=new CorrelationData();
        try {
            rabbitTemplate.convertAndSend("exchange_direct_no", "ord", book,msgId);
        }catch (AmqpConnectException e){
            System.out.println("保存信息编号:"+msgId);
        }

推荐阅读:
https://www.jianshu.com/p/9aec19a910b1
https://blog.csdn.net/qq315737546/article/details/66475103
https://www.colabug.com/2325507.html
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

举报

相关推荐

0 条评论