0
点赞
收藏
分享

微信扫一扫

SpringAMQP开启“可靠性”机制

何以至千里 2024-02-23 阅读 13

延迟队列设计

在开发过程中涉及到延迟队列的应用,例如订单生成后有30分钟的付款时间,注册是有60秒的邮件或者短信的发送读取时间等。
常规使用rabbitmq设计延迟队列有两种方式

  1. 使用创建一个延迟队列阻塞消息
  2. 使用延迟队列插件

image-20231119180143512

image-20230619235935374

配置

package com.wnhz.mq.common.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DlxConfig {

    @Bean
    public Queue dlxQueue(){
        return new Queue("dlx_queue_test");
    }

    @Bean
    public DirectExchange dlxExchange(){
        return new DirectExchange("dlx_exchange_test");
    }

    @Bean
    public Binding dlxBinding(){
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
                .with("dlx_routing_key");
    }

    @Bean
    public Queue normalQueue(){
        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange", "dlx_exchange_test");
        map.put("x-dead-letter-routing-key","dlx_routing_key");
        return new Queue("normal_queue_test",true,false,false,map);
    }


    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange("normal_exchange_test");
    }

    @Bean
    public Binding normalBinding(){
        return BindingBuilder.bind(normalQueue()).to(normalExchange())
                .with("normal_routing_test");
    }

}
server:
  port: 10005

spring:
  application:
    name: book-consumer
  autoconfigure:
    exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure, org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
  rabbitmq:
    host: 192.168.198.130
    port: 5672
    username: admin
    password: 123
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
logging:
  level:
    com.wnhz.mq.consumer: debug

生产者发送信息

image-20230723161609896

    @Override
    public void delaySendMessage() {
        String uuid = UUID.randomUUID().toString();
        CorrelationData data = new CorrelationData(uuid);
        String msg = "hello delay";
        int delayTime =5000;

        rabbitTemplate.convertAndSend("normal_exchange_test", "normal_routing_test", msg,
                p -> {
                    p.getMessageProperties().setExpiration(String.valueOf(delayTime ));
                    return p;
                });

        log.debug("发送一条消息{},当前时间:{},延迟{}秒", msg, new Date(), delayTime / 1000);
    }
}

消费者消费

   @RabbitListener(queues = "dlx_queue_test")
    public void delayConsume(Message message){
      log.debug("消费者消费信息:{},当前时间:{}",message.getBody(),new Date());
    }

延迟队列插件安装

访问官网

Community Plugins — RabbitMQ

image-20230619214424612

image-20230619214539126

进入rabbitmq docker容器

[root@localhost ~]# docker exec -it rabbitmq bash

查询插件列表是否存在延迟插件

root@6d2342d51b11:/plugins# rabbitmq-plugins list
root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@6d2342d51b11
 |/
[  ] rabbitmq_amqp1_0                  3.9.11
[  ] rabbitmq_auth_backend_cache       3.9.11
[  ] rabbitmq_auth_backend_http        3.9.11
[  ] rabbitmq_auth_backend_ldap        3.9.11
[  ] rabbitmq_auth_backend_oauth2      3.9.11
[  ] rabbitmq_auth_mechanism_ssl       3.9.11
[  ] rabbitmq_consistent_hash_exchange 3.9.11
[  ] rabbitmq_event_exchange           3.9.11
[  ] rabbitmq_federation               3.9.11
[  ] rabbitmq_federation_management    3.9.11
[  ] rabbitmq_jms_topic_exchange       3.9.11
[E*] rabbitmq_management               3.9.11
[e*] rabbitmq_management_agent         3.9.11
[  ] rabbitmq_mqtt                     3.9.11
[  ] rabbitmq_peer_discovery_aws       3.9.11
[  ] rabbitmq_peer_discovery_common    3.9.11
[  ] rabbitmq_peer_discovery_consul    3.9.11
[  ] rabbitmq_peer_discovery_etcd      3.9.11
[  ] rabbitmq_peer_discovery_k8s       3.9.11
[E*] rabbitmq_prometheus               3.9.11
[  ] rabbitmq_random_exchange          3.9.11
[  ] rabbitmq_recent_history_exchange  3.9.11
[  ] rabbitmq_sharding                 3.9.11
[  ] rabbitmq_shovel                   3.9.11
[  ] rabbitmq_shovel_management        3.9.11
[  ] rabbitmq_stomp                    3.9.11
[  ] rabbitmq_stream                   3.9.11
[  ] rabbitmq_stream_management        3.9.11
[  ] rabbitmq_top                      3.9.11
[  ] rabbitmq_tracing                  3.9.11
[  ] rabbitmq_trust_store              3.9.11
[e*] rabbitmq_web_dispatch             3.9.11
[  ] rabbitmq_web_mqtt                 3.9.11
[  ] rabbitmq_web_mqtt_examples        3.9.11
[  ] rabbitmq_web_stomp                3.9.11
[  ] rabbitmq_web_stomp_examples       3.9.11

下载支持3.9.x的插件

image-20230619215207816

退出容器:

root@6d2342d51b11:/plugins# exit
exit

上传到linux服务器

在/usr/local/software/下创建文件夹rabbitmq/plugins

[root@localhost software]# mkdir -p rabbitmq/plugins
image-20230619215427865

拷贝插件到容器中

[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins

进入容器安装插件

[root@localhost plugins]# docker  exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

打开管理页面

image-20230619220041631

代码实现

配置类
package com.wnhz.rabbitmq.mq.config;

public interface RabbitmqConstants {

    String DELAYX_QUEUE = "mq_delayx__queue";
    String DELAYX_ROUTING_KEY = "mq_delayx_routing_key";
    String DELAYX_EXCHANGE = "mq_delayx__exchange";
    String DELAYX_EXCHANGE_TYPE = "x-delayed-message";
}
package com.wnhz.rabbitmq.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;

import java.util.HashMap;

@Configuration
@Slf4j
public class RabbitmqConfig {


    @Bean
    public Queue delayxQueue() {
        return new Queue(RabbitmqConstants.DELAYX_QUEUE);
    }

    @Bean
    public CustomExchange delayRoutingExchange() {
        return new CustomExchange(RabbitmqConstants.DELAYX_EXCHANGE,
                RabbitmqConstants.DELAYX_EXCHANGE_TYPE,
                true,
                false,
                new HashMap<String, Object>() {{
                  put("x-delayed-type","direct");
                }});
    }

    @Bean
    public Binding delayxBinding() {
        return BindingBuilder.bind(delayxQueue())
                .to(delayRoutingExchange())
                .with(RabbitmqConstants.DELAYX_ROUTING_KEY).noargs();
    }


    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        log.debug("rabbitmq配置:{}完成", rabbitTemplate);
        return rabbitTemplate;
    }
}

生产者
@Service
@Slf4j
public class ProduceServiceImpl implements IProduceService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendDelayxUser(User user) {
        int delayTime = 10000;
        rabbitTemplate.convertAndSend(
                RabbitmqConstants.DELAYX_EXCHANGE,
                RabbitmqConstants.DELAYX_ROUTING_KEY,
                user, mpp -> {
                    mpp.getMessageProperties().setDelay(delayTime);
                    return mpp;
                });
        log.debug("发送消息:{},发送时间:{},延迟:{}秒", user,new Date(),delayTime/1000);
    }
}
消费者
@Slf4j
@Service
public class ConsumeServiceImpl implements IConsumeService {


    @RabbitListener(queues = RabbitmqConstants.DELAYX_QUEUE)
    @Override
    public void receiveDelayxUser(User user) {
      log.debug("消费者:接收到消息-->{},接收时间:{}",user,new Date());
    }
}
举报

相关推荐

0 条评论