0
点赞
收藏
分享

微信扫一扫

java实现延迟队列Delayed

菜菜捞捞 2021-09-30 阅读 78
杨文来

延迟队列的模型是个生产者消费者模型
1.先创建实体,实现Delayed接口

public class ItemVo<T> implements Delayed {

    /**
     * 到期时间 单位:ms
     */
    private long activeTime;
    /**
     * 订单实体(使用泛型是因为后续扩展其他业务共用此业务类)
     */
    private T data;

    public ItemVo(long activeTime, T data) {
        // 将传入的时间转换为超时的时刻
        this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime, TimeUnit.MILLISECONDS)
                + System.nanoTime();
        this.data = data;
    }

    public long getActiveTime() {
        return activeTime;
    }
    public T getData() {
        return data;
    }

    /**
     *  按照剩余时间进行排序
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        // 订单剩余时间-当前传入的时间= 实际剩余时间(单位纳秒)
        long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        // 根据剩余时间判断等于0 返回1 不等于0
        // 有可能大于0 有可能小于0  大于0返回1  小于返回-1
        return (d == 0) ? 0 : ((d > 0) ? 1 : -1);
    }

    /**
     *  获取剩余时间
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
//        // 剩余时间= 到期时间-当前系统时间,系统一般是纳秒级的,所以这里做一次转换
        long d = unit.convert(activeTime-System.nanoTime(), TimeUnit.NANOSECONDS);
        return d;
    }

2.消费者,实现Runnable接口,消费者有个属性是我们的Delaqueue队列,我们可以在run方法中消费这个队列中的任务

public class PutOrder implements Runnable {

    /**
     * 使用DelayQueue:一个使用优先级队列实现的无界阻塞队列。
     */
    private DelayQueue<ItemVo<Order>> queue;

    public PutOrder(DelayQueue<ItemVo<Order>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            ItemVo<Order> take = queue.take();
            String orderId = take.getData().getOrderId();
            System.out.println("开始检查订单:" + orderId + ",线程:" + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3.生产者

/**
     *  模仿下单之后将订单放入延迟队列
     *  延迟队列交给线程池去消费
     *  消费者可以自定义订单的消费 例如:订单未支付取消订单
     */
    @GetMapping("createOrder2")
    public Long createOrder2() throws InterruptedException {
        // 创建订单实体
        Order tb001 = new Order("tb001", new BigDecimal(9.9));
        // 创建队列中的业务类,参数一是延迟时间,参数二是实体
        ItemVo<Order> itemVoTb = new ItemVo<Order>(1000 * 10, tb001);
        // 创建延迟队列
        DelayQueue<ItemVo<Order>> queue = new DelayQueue<ItemVo<Order>>();
        // 将业务类放入延迟队列
        queue.offer(itemVoTb);
        // 创建线程任务
        Thread task = new Thread(new PutOrder(queue));
        // 将任务提交线程池
        executorService.execute(task);
        return 001L;

    }
举报

相关推荐

0 条评论