0
点赞
收藏
分享

微信扫一扫

商城电商day14 消息队列工具 RabbitMQ

月半小夜曲_ 2022-04-04 阅读 35
java

消息队列

标题三、消息队列工具 RabbitMQ

4.1.2 发送确认

有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

4.1.3 手动消费确认

有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?

要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

4.2.4 封装发送消息确认

步骤
1 MQProducerAckConfig 实现两个接口 RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback
2 public void init() 初始化两个 一个是消息是否正确到达 Exchange 另一个是消息没有正确到达队列时触发回调

package com.atguigu.gmall.common.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 封装发送端消息确认
 */
@Component
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {


    //  发送消息: RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;



    //  写一个方法
    //  修饰一个非静态的void()方法,在服务器加载Servlet的时候运行,
    //  并且只会被服务器执行一次在构造函数之后执行,init()方法之前执行。
    @PostConstruct
    public void init(){
//        只确认消息是否正确到达 Exchange 中
        rabbitTemplate.setConfirmCallback(this);
//        消息没有正确到达队列时触发回调,如果正确到达队列不执行
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     *  <p>
     *   1. 如果消息没有到exchange,则confirm回调,ack=false
     *   2. 如果消息到达exchange,则confirm回调,ack=true
     *   3. exchange到queue成功,则不回调return
     *   4. exchange到queue失败,则回调return
     *
     *
     */

    /**
     * 消息成功发送到交换机上
     * @param correlationData 数据载体带有Id标识的!
     * @param ack   消息是否发送成功
     * @param cause 消息发送失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            //  消息成功发送到了交换机
            System.out.println("消息发送成功!");
        }else {
            System.out.println("消息发送异常");
        }
    }

    /**
     * 表示消息如果没有成功发送到队列则会执行当前这个方法!
     * @param message 消息主体
     * @param replyCode 应答码
     * @param replyText 描述
     * @param exchange 消息使用的交换器
     * @param routingKey 消息使用的路由键
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息主体: " + new String(message.getBody()));
        System.out.println("应答码: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);
    }
}

4.2.5 封装消息发送工具类 供调用

package com.atguigu.gmall.common.service;


@Service
public class RabbitService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *  发送消息
     * @param exchange 交换机
     * @param routingKey 路由键
     * @param message 消息
     */
    public boolean sendMessage(String exchange, String routingKey, Object message) {
 
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        return true;
    }
   
}

4.2.6 测试 发送消息 确认消息

消息发送端

package com.atguigu.gmall.mq.controller;


@RestController
@RequestMapping("/mq")
@Slf4j
public class MqController {


   @Autowired
   private RabbitService rabbitService;


   /**
    * 消息发送
    */
   //http://cart.gmall.com/8282/mq/sendConfirm
   @GetMapping("sendConfirm")
   public Result sendConfirm() {
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      rabbitService.sendMessage("exchange.confirm", "routing.confirm", sdf.format(new Date()));
      return Result.ok();
   }
}

消息接收端

package com.atguigu.gmall.mq.receiver;


@Component
@Configuration
public class ConfirmReceiver {

@SneakyThrows
@RabbitListener(bindings=@QueueBinding(
        value = @Queue(value = "queue.confirm",autoDelete = "false"),
        exchange = @Exchange(value = "exchange.confirm",autoDelete = "true"),
        key = {"routing.confirm"}))
public void process(Message message, Channel channel){
    System.out.println("RabbitListener:"+new String(message.getBody()));

    // 采用手动应答模式, 手动确认应答更为安全稳定
    //如果手动确定了,再出异常,mq不会通知;如果没有手动确认,抛异常mq会一直通知
    //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        // false 确认一个消息,true 批量确认
     channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
}
}

4.3改造商品搜索上下架

4.3.1 定义商品上下架常量

在rabbit-util模块中导入常量类MqConst。

/**
 * 商品上下架
 */
public static final String EXCHANGE_DIRECT_GOODS = "exchange.direct.goods";
public static final String ROUTING_GOODS_UPPER = "goods.upper";
public static final String ROUTING_GOODS_LOWER = "goods.lower";
//队列
public static final String QUEUE_GOODS_UPPER  = "queue.goods.upper";
public static final String QUEUE_GOODS_LOWER  = "queue.goods.lower";

4.3.3 service-product发送消息

我在商品上架与商品添加时发送消息 就是更新Good 不止更新skuInfoUp.setIsSale(1);
商品上架

原来的

    @Override
    @Transactional
    public void onSale(Long skuId) {
        // 更改销售状态
        SkuInfo skuInfoUp = new SkuInfo();
        skuInfoUp.setId(skuId);
        skuInfoUp.setIsSale(1);
        skuInfoMapper.updateById(skuInfoUp);
        //商品上架
        //rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_UPPER, skuId);

    }
实现类
@Override
@Transactional
public void onSale(Long skuId) {
    // 更改销售状态
    SkuInfo skuInfoUp = new SkuInfo();
    skuInfoUp.setId(skuId);
    skuInfoUp.setIsSale(1);
    skuInfoMapper.updateById(skuInfoUp);

    //商品上架
    rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_UPPER, skuId);
}

商品下架

实现类
@Override
@Transactional
public void cancelSale(Long skuId) {
    // 更改销售状态
    SkuInfo skuInfoUp = new SkuInfo();
    skuInfoUp.setId(skuId);
    skuInfoUp.setIsSale(0);
    skuInfoMapper.updateById(skuInfoUp);

    //商品下架
    rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_GOODS, MqConst.ROUTING_GOODS_LOWER, skuId);
}

4.3.4 service-list 消费消息 消费上架下架消息 更新商品

package com.atguigu.gmall.list.receiver;


@Component
public class ListReceiver {

    @Autowired
    private SearchService searchService;

    /**
     * 商品上架
     * @param skuId
     * @throws IOException
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = MqConst.QUEUE_GOODS_UPPER, durable = "true"),
            exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_GOODS, type = ExchangeTypes.DIRECT, durable = "true"),
            key = {MqConst.ROUTING_GOODS_UPPER}
    ))
    public void upperGoods(Long skuId, Message message, Channel channel) throws IOException {
        if (null != skuId) {
            searchService.upperGoods(skuId);
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    /**
     * 商品下架
        * @param skuId
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = MqConst.QUEUE_GOODS_LOWER, durable = "true"),
            exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_GOODS, type = ExchangeTypes.DIRECT, durable = "true"),
            key = {MqConst.ROUTING_GOODS_LOWER}
    ))
    public void lowerGoods(Long skuId, Message message, Channel channel) throws IOException {
        if (null != skuId) {
            searchService.lowerGoods(skuId);
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

商品上下架消息队列 调用的更新Goods的方法


    @Override
    public void upperGoods(Long skuId) {
        //  声明一个Goods 对象
        Goods goods = new Goods();

        //  异步编排!
        SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);

        if (skuInfo!=null){
            goods.setId(skuId);
            goods.setDefaultImg(skuInfo.getSkuDefaultImg());
            goods.setTitle(skuInfo.getSkuName());
            goods.setPrice(skuInfo.getPrice().doubleValue());
            goods.setCreateTime(new Date());

            //  赋值品牌数据
            BaseTrademark trademark = productFeignClient.getTrademark(skuInfo.getTmId());
            goods.setTmId(skuInfo.getTmId());
            goods.setTmName(trademark.getTmName());
            goods.setTmLogoUrl(trademark.getLogoUrl());

            //  赋值分类数据:
            BaseCategoryView categoryView = productFeignClient.getCategoryView(skuInfo.getCategory3Id());
            goods.setCategory1Id(categoryView.getCategory1Id());
            goods.setCategory2Id(categoryView.getCategory2Id());
            goods.setCategory3Id(categoryView.getCategory3Id());
            goods.setCategory1Name(categoryView.getCategory1Name());
            goods.setCategory2Name(categoryView.getCategory2Name());
            goods.setCategory3Name(categoryView.getCategory3Name());

            //  赋值平台属性集0
            List<BaseAttrInfo> attrList = productFeignClient.getAttrList(skuId);
            //  Function R apply(T t)
            //  Stream() 流式编程
            List<SearchAttr> searchAttrList = attrList.stream().map(baseAttrInfo -> {
                //  创建一个对象
                SearchAttr searchAttr = new SearchAttr();
                searchAttr.setAttrId(baseAttrInfo.getId());
                searchAttr.setAttrName(baseAttrInfo.getAttrName());
                searchAttr.setAttrValue(baseAttrInfo.getAttrValueList().get(0).getValueName());
                return searchAttr;
            }).collect(Collectors.toList());

            goods.setAttrs(searchAttrList);
        }

        goodsRepository.save(goods);
    }

    @Override
    public void lowerGoods(Long skuId) {
        //  根据id 删除
        goodsRepository.deleteById(skuId);
    }

五、延迟消息

延迟消息有两种实现方案:
1,基于死信队列
2,集成延迟插件

5.1基于死信实现延迟消息

使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现延迟队列

5.1.1 消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
如何设置TTL:
我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在压在这个队列的消息在5秒后会消失。

5.1.2 死信交换机 Dead Letter Exchanges

一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
(1) 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
(2)上面的消息的TTL到了,消息过期了。
(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

5.1.3 代码实现

我们现在可以测试一下延迟队列。
(1)创建死信队列
(2)创建交换机
(3)建立交换器与队列之间的绑定
(4)创建队列

5.1.3.1 在service-mq 中添加配置类

package com.atguigu.gmall.mq.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadLetterMqConfig {
    // 声明一些变量

       public static final String exchange_dead = "exchange.dead";
    public static final String routing_dead_1 = "routing.dead.1";
    public static final String routing_dead_2 = "routing.dead.2";
    public static final String queue_dead_1 = "queue.dead.1";
    public static final String queue_dead_2 = "queue.dead.2";

    // 定义交换机
    @Bean
    public DirectExchange exchange(){
        return new DirectExchange(exchange_dead,true,false,null);
    }

    @Bean
    public Queue queue1(){
        // 设置如果队列一 出现问题,则通过参数转到exchange_dead,routing_dead_2 上!
        HashMap<String, Object> map = new HashMap<>();
        // 参数绑定 此处的key 固定值,不能随意写
        map.put("x-dead-letter-exchange",exchange_dead);
        map.put("x-dead-letter-routing-key",routing_dead_2);
        // 设置延迟时间
        map.put("x-message-ttl", 10 * 1000);
        // 队列名称,是否持久化,是否独享、排外的【true:只可以在本次连接中访问】,是否自动删除,队列的其他属性参数
        return new Queue(queue_dead_1,tr8
`ue,false,false,map);
    }

    @Bean
    public Binding binding(){
        // 将队列一 通过routing_dead_1 key 绑定到exchange_dead 交换机上
        return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);
    }

    // 这个队列二就是一个普通队列
    @Bean
    public Queue queue2(){
        return new Queue(queue_dead_2,true,false,false,null);
    }

    // 设置队列二的绑定规则
    @Bean
    public Binding binding2(){
        // 将队列二通过routing_dead_2 key 绑定到exchange_dead交换机上!
        return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);
    }
}

5.1.3.2 配置发送消息

package com.atguigu.gmall.mq.controller;


@RestController
@RequestMapping("/mq")
@Slf4j
public class MqController {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Autowired
   private RabbitService rabbitService;

 @GetMapping("sendDeadLettle")
   public Result sendDeadLettle() {
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     this.rabbitTemplate.convertAndSend(DeadLetterMqConfig.exchange_dead, DeadLetterMqConfig.routing_dead_1, "ok");
      System.out.println(sdf.format(new Date()) + " Delay sent.");
      return Result.ok();
   }
}

5.1.3.3消息接收方

为什么 接收的是队列2 queue_dead_2
发送的是队列 1 routing_dead_1

package com.atguigu.gmall.mq.receiver;


@Component
@Configuration
public class DeadLetterReceiver {

    @RabbitListener(queues = DeadLetterMqConfig.queue_dead_2)
    public void get(String msg) {
        System.out.println("Receive:" + msg);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("Receive queue_dead_2: " + sdf.format(new Date()) + " Delay rece." + msg);
    }
}

5.2基于延迟插件实现延迟消息

5.2.2 代码实现

package com.atguigu.gmall.mq.config;

@Configuration
public class DelayedMqConfig {

    public static final String exchange_delay = "exchange.delay";
    public static final String routing_delay = "routing.delay";
    public static final String queue_delay_1 = "queue.delay.1";

    /**
     * 队列不要在RabbitListener上面做绑定,否则不会成功,如队列2,必须在此绑定
     *
     * @return
     */

    @Bean
    public Queue delayQeue1() {
        // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
        return new Queue(queue_delay_1, true);
    }

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding delayBbinding1() {
        return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();
    }
}

发送消息

@GetMapping("sendDelay")
public Result sendDelay() {
   SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
   this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, sdf.format(new Date()), new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
         message.getMessageProperties().setDelay(10 * 1000);
         System.out.println(sdf.format(new Date()) + " Delay sent.");
         return message;
      }
   });
   return Result.ok();
}

接收消息

package com.atguigu.gmall.mq.receiver;


@Component
public class DelayReceiver {

    @RabbitListener(queues = DelayedMqConfig.queue_delay_1)
    public void get(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("Receive queue_delay_1: " + sdf.format(new Date()) + " Delay rece." + msg);
    }

}

5.3基于延迟插件实现取消订单

service-order模块

5.3.1 业务配置与接口封装

rabbit-util模块配置常量MqConst

/**
 * 取消订单,发送延迟队列
 */
public static final String EXCHANGE_DIRECT_ORDER_CANCEL = "exchange.direct.order.cancel";//"exchange.direct.order.create" test_exchange;
public static final String ROUTING_ORDER_CANCEL = "order.create";
//延迟取消订单队列
public static final String QUEUE_ORDER_CANCEL  = "queue.order.cancel";
//取消订单 延迟时间 单位:秒
public static final int DELAY_TIME  = 10;

rabbit-util模块延迟接口封装

RabbitService
/**
 * 发送延迟消息
 * @param exchange 交换机
 * @param routingKey 路由键
 * @param message 消息
 * @param delayTime 单位:秒
 */
public boolean sendDelayMessage(String exchange, String routingKey, Object message, int delayTime) {

 
    this.rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setDelay(delayTime*1000);
            return message;
        }
    });
    return true;
}

取消订单配置类

package com.atguigu.gmall.order.receiver;


@Configuration
public class OrderCanelMqConfig {

    @Bean
    public Queue delayQueue() {
        // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
        return new Queue(MqConst.QUEUE_ORDER_CANCEL, true);
    }

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(MqConst.EXCHANGE_DIRECT_ORDER_CANCEL, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelay() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(MqConst.ROUTING_ORDER_CANCEL).noargs();
    }

}

5.3.3 发送消息

创建订单时,发送延迟消息
修改保存订单方法

@Override
@Transactional
public Long saveOrderInfo(OrderInfo orderInfo) {
    .....
    //发送延迟队列,如果定时未支付,取消订单
 rabbitService.sendDelayMessage(MqConst.EXCHANGE_DIRECT_ORDER_CANCEL, MqConst.ROUTING_ORDER_CANCEL, orderInfo.getId(), MqConst.DELAY_TIME);
    // 返回
    return orderInfo.getId();
}

详细代码

 @Override
    @Transactional(rollbackFor = Exception.class)
    public Long saveOrderInfo(OrderInfo orderInfo) {
        /*
        1.  orderInfo ,orderDetail
        需要手动设置:total_amount,order_status,user_id,out_trade_no,trade_body,create_time,expire_time,process_status
         */
        //  总金额:单价*数量
        orderInfo.sumTotalAmount(); // 自动赋值
        orderInfo.setOrderStatus(OrderStatus.UNPAID.name());
        //  在实现类中能否获取到userId? 在控制器获取!
        //  第三方交易编号要求唯一!
        String outTradeNo = "ATGUIGU" + System.currentTimeMillis() + "" + new Random().nextInt(1000);
        orderInfo.setOutTradeNo(outTradeNo);
        //  订单描述
        orderInfo.setTradeBody("有钱就想花,就是玩!");
        //  可以将送货清单中的skuName 进行拼接!放入TradeBody 中!
        orderInfo.setCreateTime(new Date());
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.DATE,1);
        orderInfo.setExpireTime(calendar.getTime()); // 默认24小时过期!

        orderInfo.setProcessStatus(ProcessStatus.UNPAID.name());
        orderInfoMapper.insert(orderInfo);
        //  获取到订单明细:
        List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();
        for (OrderDetail orderDetail : orderDetailList) {
            orderDetail.setOrderId(orderInfo.getId());
            orderDetailMapper.insert(orderDetail);
        }
        Long orderId = orderInfo.getId();

        //  发的内容是:订单Id
        rabbitService.sendDelayMessage(MqConst.EXCHANGE_DIRECT_ORDER_CANCEL,MqConst.ROUTING_ORDER_CANCEL,orderId,MqConst.DELAY_TIME);
        //  返回订单Id
        return orderId;
    }

5.3.4 接收消息

监听到后进行订单取消操作

package com.atguigu.gmall.order.receiver;


@Component
public class OrderReceiver {

    @Autowired
    private OrderService orderService;

    //  监听的消息
    @SneakyThrows
    @RabbitListener(queues = MqConst.QUEUE_ORDER_CANCEL)
    public void cancelOrder(Long orderId , Message message, Channel channel){
        //  判断当前订单Id 不能为空
        try {
            if (orderId!=null){
                //  发过来的是订单Id,那么你就需要判断一下当前的订单是否已经支付了。
                //  未支付的情况下:关闭订单
                //  根据订单Id 查询orderInfo select * from order_info where id = orderId
                //  利用这个接口IService  实现类ServiceImpl 完成根据订单Id 查询订单信息 ServiceImpl 类底层还是使用的mapper
                OrderInfo orderInfo = orderService.getById(orderId);
                //  判断支付状态,进度状态
                if (orderInfo!=null && "UNPAID".equals(orderInfo.getOrderStatus())
                        && "UNPAID".equals(orderInfo.getProcessStatus())){
                    //  关闭订单
                    //  int i = 1/0;
                    orderService.execExpiredOrder(orderId);
                }
            }
        } catch (Exception e) {
            //  消息没有正常被消费者处理:
            //  第一个参数就是long 标识, 第二个参数表示 是否批量确认 ,第三个参数是否重回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            //  发短信的形式通知我们的管理员,立刻查看并解决问题 类似于消息的积压!
            //  如果线上真的出现了这种问题,你能停止这个微服务改代码? 跟我刚刚的操作类似,启动一个其他的程序消费这个消息!
            e.printStackTrace();
        }
        //  手动确认消息 如果不确认,有可能会到消息残留。
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}


5.3.5 编写取消订单接口与实现类

/**
 * 处理过期订单
 * @param orderId
 */
void execExpiredOrder(Long orderId);

/**
 * 根据订单Id 修改订单的状态
 * @param orderId
 * @param processStatus
 */
void updateOrderStatus(Long orderId, ProcessStatus processStatus);


@Override
public void execExpiredOrder(Long orderId) {
    // orderInfo
    updateOrderStatus(orderId, ProcessStatus.CLOSED);
}
@Override
public void updateOrderStatus(Long orderId, ProcessStatus processStatus) {
    OrderInfo orderInfo = new OrderInfo();
    orderInfo.setId(orderId);
    orderInfo.setProcessStatus(processStatus.name());
    orderInfo.setOrderStatus(processStatus.getOrderStatus().name());
    orderInfoMapper.updateById(orderInfo);
}

举报

相关推荐

day14

Day14 IO

Oracle day14

Day14(String类)

day14学习总结

算法打卡Day14

java基础 Day14

网络运维Day14

0 条评论