0
点赞
收藏
分享

微信扫一扫

龙芯2K1000实战开发-系统配置详解

窗外路过了谁 2023-06-01 阅读 73

延迟队列

背:也就是给队列设置个过期时间,然后到时间消息变成死信,消费死信队列中的消息就行,再没什么玩意,演示队列优化就是不给队列这只TTL,再生产者代码中消息里面设置消息TTL,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。所以就是消费顺序问题要安装个插件

延迟队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定是按处理的元素的队列

延迟队列使用场景

1. 订单在十分钟之内未支付则自动取消

2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。

4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。

5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

设置TTL的两种方式:

1.队列设置TTL

在创建对别的收设置队列的x-message-ttl属性,例如

Map<String, Object> map = new HashMap<>();
//设置队列有效期为10秒
map.put("x-message-ttl",10000);
channel.queueDeclare(queueName,durable,exclusive,autoDelete,map);

消息设置TTL
对每条消息设置TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
 channel.basicPublish(exchangeName,routingKey,mandatory,properties,"msg body".getBytes());
两者之间的区别:
1.如果设置了队列的TTL属性,那么一旦消息过期,就会被队列的丢弃
2.如果是消息设置了TTL属性,那么即使消息过期,也不一定会马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息挤压情况,那么已
经过期的消息也许还能存活较长时间
3.如果我们没有设置TTL,就表示消息永远不会过期,如果TTL设置为0,则表示除非此时可以直接投递到消费者,否则该消息会被丢弃

整合springboot

添加依赖

 <!--RabbitMQ 依赖-->
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.47</version>
 </dependency>
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 </dependency>
 <!--swagger-->
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger2</artifactId>
 <version>2.9.2</version>
 </dependency>
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger-ui</artifactId>
 <version>2.9.2</version>
 </dependency>
 <!--RabbitMQ 测试依赖-->
 <dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit-test</artifactId>
 <scope>test</scope>
 </dependency>
</dependencies>

 修改配置文件

spring.rabbitmq.host=182.92.234.71
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

添加swagger配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
 @Bean
 public Docket webApiConfig(){
 return new Docket(DocumentationType.SWAGGER_2)
 .groupName("webApi")
 .apiInfo(webApiInfo())
 .select()
 .build();
 }
 private ApiInfo webApiInfo(){
 return new ApiInfoBuilder()
 .title("rabbitmq 接口文档")
 .description("本文档描述了 rabbitmq 微服务接口定义")
 .version("1.0")
 .contact(new Contact("enjoy6288", "http://atguigu.com", 
"1551388580@qq.com"))
 .build();
 }
}

队列TTL

代码架构图

创建两个队列 QA QB ,两者队列 TTL 分别设置为 10S 40S ,然后在创建一个交换机 X 和死信交

换机 Y ,它们的类型都是 direct ,创建一个死信队列 QD ,它们的绑定关系如下:

配置文件类代码

@Configuration
public class TtlQueueConfig {
 public static final String X_EXCHANGE = "X";
 public static final String QUEUE_A = "QA";
 public static final String QUEUE_B = "QB";
 public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
 public static final String DEAD_LETTER_QUEUE = "QD";
 // 声明 xExchange
 @Bean("xExchange")
 public DirectExchange xExchange(){
 return new DirectExchange(X_EXCHANGE);
 }
 // 声明 xExchange
 @Bean("yExchange")
 public DirectExchange yExchange(){
 return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
 }
 //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
 @Bean("queueA")
 public Queue queueA(){
 Map<String, Object> args = new HashMap<>(3);
 //声明当前队列绑定的死信交换机
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 //声明当前队列的死信路由 key
 args.put("x-dead-letter-routing-key", "YD");
 //声明队列的 TTL
 args.put("x-message-ttl", 10000);
 return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
 }
 // 声明队列 A 绑定 X 交换机
 @Bean
 public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queueA).to(xExchange).with("XA");
 }
 //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
 @Bean("queueB")
 public Queue queueB(){
 Map<String, Object> args = new HashMap<>(3);
 //声明当前队列绑定的死信交换机
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 //声明当前队列的死信路由 key
 args.put("x-dead-letter-routing-key", "YD");
 //声明队列的 TTL
 args.put("x-message-ttl", 40000);
 return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
 }
 //声明队列 B 绑定 X 交换机
 @Bean
 public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
 }
 //声明死信队列 QD
 @Bean("queueD")
 public Queue queueD(){
 return new Queue(DEAD_LETTER_QUEUE);
 }
 //声明死信队列 QD 绑定关系
 @Bean
 public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
 @Qualifier("yExchange") DirectExchange yExchange){
 return BindingBuilder.bind(queueD).to(yExchange).with("YD");
 }
}

 消息生产类代码

@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
 @Autowired
 private RabbitTemplate rabbitTemplate;
 @GetMapping("sendMsg/{message}")
 public void sendMsg(@PathVariable String message){
 log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
 rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);
 rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);
 } 
}

消费者消费代码

@Component
public class DeadLetterQueueConsumer {
 @RabbitListener(queues = "QD")
 public void receiveD(Message message, Channel channel) throws IOException {
 String msg = new String(message.getBody());
 log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
 }
}

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是 每增加一个新的时间需求,就要新增一个队列 ,这里只有 10S 40S

两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然

后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

延迟队列优化(队列不设置TTL时间)

代码架构图:

在这里新增了一个队列 QC, 绑定关系如下 , 该队列不设置 TTL 时间

配置文件代码类:

@Component
public class MsgTtlQueueConfig {
 public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
 public static final String QUEUE_C = "QC";
 //声明队列 C 死信交换机
 @Bean("queueC")
 public Queue queueB(){
 Map<String, Object> args = new HashMap<>(3);
 //声明当前队列绑定的死信交换机
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 //声明当前队列的死信路由 key
 args.put("x-dead-letter-routing-key", "YD");
 //没有声明 TTL 属性
 return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
 }
 //声明队列 B 绑定 X 交换机
 @Bean
 public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queueC).to(xExchange).with("XC");
 }
}

消息生产者代码

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
 rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
 correlationData.getMessageProperties().setExpiration(ttlTime);
 return correlationData;
 });
 log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
}

发起请求

http://localhost:8080/ttl/sendExpirationMsg/ 你好 1/20000

http://localhost:8080/ttl/sendExpirationMsg/ 你好 2/2000

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过

,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

举报

相关推荐

0 条评论