阻塞队列,顾名思义,首先它是个队列,而一个阻塞队列在数据结构中所起的作用大致如下:
当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
当阻塞队列是满时,向队列中添加元素的操作将会被阻塞
试图从空队列中获取元素的线程会被阻塞,直到其他线程往空队列中插入新元素
同样
试图往已满的队列中添加新元素的线程同样也会被阻塞,直到其他线程从队列中移除了元素
真实例子:
如上图,假如阻塞队列是一个柜台,线程1是做蛋糕的师傅,线程2是买蛋糕的顾客
当柜台中没有蛋糕的时候,蛋糕师傅要做蛋糕,而顾客却需要排队等着,直到蛋糕师傅做出了蛋糕放到了柜台,那么顾客就可以买蛋糕了
当柜台已经被蛋糕装满了,蛋糕师傅做好了蛋糕却放不进柜台中,而顾客可以继续买蛋糕,直到柜台不满的时候,蛋糕师傅又可以继续做蛋糕放进柜台里了
好处
多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦满足条件,被挂起的线程又会自动被唤醒
使用阻塞队列后,就不需要开发人员自己考虑挂起和唤醒的问题,可以更专心在逻辑和业务方面
UML
给大家看一下这个大家族
1、ArrayBlockingQueue:有数组结构组成的有界阻塞队列
2、LinkedBlockingQueue:由链表结构组成的有界(但默认值是Integer.MAX_VALUE)阻塞队列
3、PriorityBlockingQueue:支持优先级排序的无界阻塞队列
4、DelayQueue:使用优先级队列实现的延迟无界队列
5、SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
6、LinkedTransferQueue:由链表结构组成的无界阻塞队列
7、LinkedBlockingDeque:由链表结构组成的双向阻塞队列
BlockQueue的核心方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
现象 | 解析 |
---|---|
抛出异常 | 当阻塞队列满时,再往队列里add插入元素,会抛IllegalStateException: Queue full 当阻塞队列空时,再从队列里remove移除元素回抛NoSuchElementException |
特殊值 | 插入方法,成功true失败false 移除方法,成功返回出队列的元素,队列里面没有就返回null |
一直阻塞 | 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程知道put数据or响应中断退出 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程知道队列可用 |
超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过后生产者线程会退出 |
用在哪
1、生产者消费者模式
2、线程池
3、消息中间件
先来回顾一下简单的生产消费模式
class ShareData {
private int num = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() {
try {
lock.lock();
// 判断 注意一定要用while进行判断,否则会出现虚假唤醒的情况
while (num != 0) {
// 等待,不能生产
condition.await();
}
// 干活
num++;
System.out.println(Thread.currentThread().getName() + "\t" + num);
// 通知唤醒
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() {
try {
lock.lock();
// 判断
while (num == 0) {
// 等待,不能生产
condition.await();
}
// 干活
num--;
System.out.println(Thread.currentThread().getName() + "\t" + num);
// 通知唤醒
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
/**
* 题目:一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,执行五轮
*/
public class ProdConsumerTraditionTest {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.increment();
}
}, "t1").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.decrement();
}
}, "t2").start();
}
}
注意:一定要用while进行判断,否则会出现虚假唤醒的情况
下面我们回到蛋糕店的例子,模拟个场景,使用阻塞队列来实现
假设现在蛋糕店在做活动,老板开启了秒杀活动,在五秒的时间内,蛋糕师傅做蛋糕,顾客买蛋糕,知道老板喊停,那么蛋糕师傅休息,顾客也不买了
class CakeShop {
// 是否开启活动,默认开启,进行生产+消费
private volatile boolean flag = true;
// 模拟蛋糕
private AtomicInteger num = new AtomicInteger();
// 展台,使用阻塞队列,通过构造器传入不同类型
private BlockingQueue<String> blockingQueue = null;
public CakeShop(BlockingQueue<String> blockingQueue) {
System.out.println(blockingQueue.getClass().getName());
this.blockingQueue = blockingQueue;
}
public void prod() throws InterruptedException {
String cake = null;
boolean result;
while (flag) {
cake = String.valueOf(num.incrementAndGet());
// 模拟做蛋糕的时间
TimeUnit.SECONDS.sleep(1);
// 将蛋糕放入展柜
result = blockingQueue.offer(cake);
if (result) {
System.out.println(Thread.currentThread().getName() + "\t 做蛋糕,插入队列" + cake + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t 做蛋糕,插入队列" + cake + "失败");
}
}
System.out.println(Thread.currentThread().getName() + "\t 老板喊停,表示flag = false,生产动作结束");
}
public void consumer() throws InterruptedException {
String data = null;
while (flag) {
// 从展柜拿蛋糕,但是等了两秒都没买到的话就不买了
data = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == data) {
flag = false;
System.out.println(Thread.currentThread().getName() + "\t 超过两秒没有买到,消费退出");
return;
}
System.out.println(Thread.currentThread().getName() + "\t 买蛋糕, 消费队列" + data + "成功");
}
System.out.println(Thread.currentThread().getName() + "\t 活动停了,表示flag = false,生产动作结束");
}
public void stop() {
this.flag = false;
}
}
/**
* volatile/CAS/atomicInteger/BlockQueue/线程交互/原子引用
*/
public class ProdConsumerBlockingQueueTest {
public static void main(String[] args) {
CakeShop cakeShop = new CakeShop(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
try {
cakeShop.prod();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "蛋糕师傅").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
try {
cakeShop.consumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "顾客").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t 5秒钟到,老板叫停");
cakeShop.stop();
}
}
内容均来源于学习资料,在学习过程中进行记录,如有侵权联系作者进行删除