0
点赞
收藏
分享

微信扫一扫

Spring boot + RabbitMQ 之 消息确认

代码敲到深夜 2021-09-28 阅读 78

RabbitMQ消息 确认分为两种:一是生产确认,二是消费确认
RabbitMQ本身支持两种 确认方式:一是事务确认,二是ACK确认

这里直接介绍Spring Boot+RabbitMQ 的消息确认(ACK)

一:生产确认

生产者确认需要在生产的地方实现 RabbitTemplate.ConfirmCallback

@Service
public class PersonalService implements RabbitTemplate.ConfirmCallback{

    @Autowired
    public UserInfoDao userInfoDao;
    @Autowired
    public MailTools mailTools;
    @Autowired
    public LoginService loginService;
    public RabbitTemplate rabbitTemplate;
    
    static Logger log = Logger.getLogger(PersonalService.class);

    /**
     * 需要通过生产者的构造器去注入RabbitTemplate,并设置他 回调确认对象为 当前对象。
     */
    public PersonalService(RabbitTemplate rabbitTemplate){
        this.rabbitTemplate=rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback(this);
    }
}

实现了 ConfirmCallback 之后需要实现 confirm 方法

    /**
     * 消息发送到队列回调该方法
     * correlationData : 发送消息时给的id
     * cause : 会返回错误信息,正确为null
     * ack:是否正确发送
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // TODO Auto-generated method stub
        System.out.println("confirm --> "+correlationData.getId()+" ->"+ack+" ->"+cause);
    }

二:消费确认

在Rabbitmq+Springboot中,消费者的实现方式为注解方式:

@Component
public class MessageReceiver {

    @RabbitListener(queues = AmqpConfiguration.QUEUE)
    //@RabbitHandler
    public void receive(String hello, Channel channel, Message message){
        // 限流处理:消息体大小不限制,每次限制消费一条,只作用于该Consumer层,不作用于Channel
        channel.basicQos(0, 1, false);//限制于消费级别

        String messsageText = new String(message.getBody());
        System.out.println("[receiver] receive message : "+ messsageText);
        try {
            if(validate(messsageText)){
                System.out.println("[receiver] confirm");
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息接收
            }else{
                System.out.println("[receiver] reject");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//拒绝消息接收
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private boolean validate(String messsageText) {
        return !(messsageText!=null && messsageText.indexOf("fuck")>-1);
    }
}

-> @RabbitListener:监听的队列
-> @RabbitHandler:但@RabbitListener注解在类上时,需要使用@RabbitHandler来指明调用的方法。
-> void basicAck(long deliveryTag, boolean multiple) throws IOException; 确认消息接收 deliveryTag:该消息的index,multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
-> channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); 拒绝消息接收 deliveryTag:该消息的index,multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息,requeue:被拒绝的是否重新入队列。
-> channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); 拒绝消息接收 deliveryTag:该消息的index,requeue:被拒绝的是否重新入队列,channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息。
-> chanel.basicQos(int prefetchSize, int prefetchCount, boolean global) 消息限流的功能,防止生产过多,导致消费者消费吃力的情况;
prefetchSize: 0表示对消息的大小无限制,单位为(B-字节)
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将 阻塞 掉,直到有消息ack。0为无上限
global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别。


举报

相关推荐

0 条评论