0
点赞
收藏
分享

微信扫一扫

阻塞队列

彪悍的鼹鼠 2021-09-30 阅读 102
高并发

阻塞队列,顾名思义,首先它是个队列,而一个阻塞队列在数据结构中所起的作用大致如下:


当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
当阻塞队列是满时,向队列中添加元素的操作将会被阻塞

试图从空队列中获取元素的线程会被阻塞,直到其他线程往空队列中插入新元素
同样
试图往已满的队列中添加新元素的线程同样也会被阻塞,直到其他线程从队列中移除了元素

真实例子:
如上图,假如阻塞队列是一个柜台,线程1是做蛋糕的师傅,线程2是买蛋糕的顾客
当柜台中没有蛋糕的时候,蛋糕师傅要做蛋糕,而顾客却需要排队等着,直到蛋糕师傅做出了蛋糕放到了柜台,那么顾客就可以买蛋糕了
当柜台已经被蛋糕装满了,蛋糕师傅做好了蛋糕却放不进柜台中,而顾客可以继续买蛋糕,直到柜台不满的时候,蛋糕师傅又可以继续做蛋糕放进柜台里了

好处

多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦满足条件,被挂起的线程又会自动被唤醒

使用阻塞队列后,就不需要开发人员自己考虑挂起和唤醒的问题,可以更专心在逻辑和业务方面

UML

给大家看一下这个大家族


1、ArrayBlockingQueue:有数组结构组成的有界阻塞队列
2、LinkedBlockingQueue:由链表结构组成的有界(但默认值是Integer.MAX_VALUE)阻塞队列
3、PriorityBlockingQueue:支持优先级排序的无界阻塞队列
4、DelayQueue:使用优先级队列实现的延迟无界队列
5、SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
6、LinkedTransferQueue:由链表结构组成的无界阻塞队列
7、LinkedBlockingDeque:由链表结构组成的双向阻塞队列

BlockQueue的核心方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
现象 解析
抛出异常 当阻塞队列满时,再往队列里add插入元素,会抛IllegalStateException: Queue full
当阻塞队列空时,再从队列里remove移除元素回抛NoSuchElementException
特殊值 插入方法,成功true失败false
移除方法,成功返回出队列的元素,队列里面没有就返回null
一直阻塞 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程知道put数据or响应中断退出
当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程知道队列可用
超时退出 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过后生产者线程会退出

用在哪

1、生产者消费者模式
2、线程池
3、消息中间件

先来回顾一下简单的生产消费模式

class ShareData {
    private int num = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() {
        try {
            lock.lock();
            // 判断 注意一定要用while进行判断,否则会出现虚假唤醒的情况
            while (num != 0) {
                // 等待,不能生产
                condition.await();
            }
            // 干活
            num++;
            System.out.println(Thread.currentThread().getName() + "\t" + num);
            // 通知唤醒
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement() {
        try {
            lock.lock();
            // 判断
            while (num == 0) {
                // 等待,不能生产
                condition.await();
            }
            // 干活
            num--;
            System.out.println(Thread.currentThread().getName() + "\t" + num);
            // 通知唤醒
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

/**
 * 题目:一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,执行五轮
 */
public class ProdConsumerTraditionTest {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                shareData.increment();
            }
        }, "t1").start();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                shareData.decrement();
            }
        }, "t2").start();
    }
}

注意:一定要用while进行判断,否则会出现虚假唤醒的情况

下面我们回到蛋糕店的例子,模拟个场景,使用阻塞队列来实现
假设现在蛋糕店在做活动,老板开启了秒杀活动,在五秒的时间内,蛋糕师傅做蛋糕,顾客买蛋糕,知道老板喊停,那么蛋糕师傅休息,顾客也不买了

class CakeShop {
    // 是否开启活动,默认开启,进行生产+消费
    private volatile boolean flag = true;

    // 模拟蛋糕
    private AtomicInteger num = new AtomicInteger();

    // 展台,使用阻塞队列,通过构造器传入不同类型
    private BlockingQueue<String> blockingQueue = null;

    public CakeShop(BlockingQueue<String> blockingQueue) {
        System.out.println(blockingQueue.getClass().getName());
        this.blockingQueue = blockingQueue;
    }

    public void prod() throws InterruptedException {
        String cake = null;
        boolean result;
        while (flag) {
            cake = String.valueOf(num.incrementAndGet());
            // 模拟做蛋糕的时间
            TimeUnit.SECONDS.sleep(1);
            // 将蛋糕放入展柜
            result = blockingQueue.offer(cake);
            if (result) {
                System.out.println(Thread.currentThread().getName() + "\t 做蛋糕,插入队列" + cake + "成功");
            } else {
                System.out.println(Thread.currentThread().getName() + "\t 做蛋糕,插入队列" + cake + "失败");
            }
        }
        System.out.println(Thread.currentThread().getName() + "\t 老板喊停,表示flag = false,生产动作结束");
    }

    public void consumer() throws InterruptedException {
        String data = null;
        while (flag) {
            // 从展柜拿蛋糕,但是等了两秒都没买到的话就不买了
            data = blockingQueue.poll(2L, TimeUnit.SECONDS);

            if (null == data) {
                flag = false;
                System.out.println(Thread.currentThread().getName() + "\t 超过两秒没有买到,消费退出");
                return;
            }
            System.out.println(Thread.currentThread().getName() + "\t 买蛋糕, 消费队列" + data + "成功");

        }
        System.out.println(Thread.currentThread().getName() + "\t 活动停了,表示flag = false,生产动作结束");
    }

    public void stop() {
        this.flag = false;
    }
}

/**
 * volatile/CAS/atomicInteger/BlockQueue/线程交互/原子引用
 */
public class ProdConsumerBlockingQueueTest {
    public static void main(String[] args) {
        CakeShop cakeShop = new CakeShop(new ArrayBlockingQueue<>(10));

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
            try {
                cakeShop.prod();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "蛋糕师傅").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
            try {
                cakeShop.consumer();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "顾客").start();

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "\t 5秒钟到,老板叫停");
        cakeShop.stop();
    }
}


内容均来源于学习资料,在学习过程中进行记录,如有侵权联系作者进行删除

Change the world by program

举报

相关推荐

0 条评论