0
点赞
收藏
分享

微信扫一扫

多线程消费顺序-01

janedaring 2021-09-28 阅读 105

场景:
1: 每个订单有 下单 支付 完成 3种状态
模拟10条数据,有的是有上述3种状态,有的没有

2:最终目的是 一个消费者拉取了全部数据,然后数据后台使用多线程,
保证同一个单号进入同一个队列,每个线程处理对应的队列数据。


3:通过构建消息队列和线程的一一绑定关系,以及单号个队列绑定关系。来实现同一个单号进入同一个队列

4: 存在的BUG

  • 如果宕机,内存队列里的数据就没了。需要增加的时候写到消息表
  • 如果下单消费失败了,后面的支付和完成是不能消费的,一定要等到有订单了才可以消费
    参考增加日志以及redis
    https://segmentfault.com/a/1190000018640106

消费者放入消息到队列

@Override
    public void consumer() {
        // 模拟上游传递过来的数据
        List<Thread01> thread01s = list();
        int size = thread01s.size();
        for (int i = 0; i < size; i++) {
            Thread01 thread01 = thread01s.get(i);
            System.out.println("当前获取的状态:"+thread01.toString());
            chooseQueue(thread01);
        }
    }

    /**
     * 通过单号选择进入那个队列中
     * @param thread01
     */
    private void chooseQueue(Thread01 thread01) {
        String shipId = thread01.getShipId();
         int index = Math.abs(shipId.hashCode()) % 3;
        putQueue(index,thread01);
    }

    private void putQueue(int index, Thread01 thread01)  {
        LinkedBlockingQueue<Thread01> queue = queueMap.get(index);
        try {
            queue.put(thread01);
            System.out.println(index+"-->"+queue.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

通过对单号进行HASH 决定进去那个队列。

事先设置好队列

队列下标和单号的hash算法获取index对应。

    static Map<Integer,LinkedBlockingQueue<Thread01>> queueMap=new HashMap();
    static{
        queueMap.put(0,new LinkedBlockingQueue<Thread01>(1000));
        queueMap.put(1,new LinkedBlockingQueue<Thread01>(1000));
        queueMap.put(2,new LinkedBlockingQueue<Thread01>(1000));
    }

循环消费数据。线程绑定队列

    public void doConsumer(int index){
        LinkedBlockingQueue<Thread01> queue = queueMap.get(index);
        int size = queue.size();
        while(queue.size()!=0){
//            System.out.println("队列下标:"+index+"-->队列大小:"+queue.size());
            Thread01 take = null;
            try {
                take = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 业务逻辑处理
//            System.out.println("当前获取的状态:"+take.toString()+" 线程:"+Thread.currentThread().getName());
            Thread02 ship_id = thread02Service.getOne(new QueryWrapper<Thread02>().eq("ship_id", take.getShipId()));
            Thread02 t02=new Thread02();
            if(null==ship_id){
                t02.setShipId(take.getShipId());
                t02.setStatus(take.getStatus());
                thread02Service.save(t02);
            }else{
                thread02Service.update(new UpdateWrapper<Thread02>().eq("ship_id",take.getShipId()).set("status",take.getStatus()));
            }
        }
    }
    @PostConstruct
    public void doThread(){
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
             int  index=i;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("-----------初始化线程任务-----------------");
                    while(true){
                        doConsumer(index);
                    }
                }
            },"thread_"+index);
        }
    }

结果显示

举报

相关推荐

0 条评论