文章目录
- 前言
- 一、生产消费模式?
- 二、BlockingQueue的实现
- 三、实战
- 1 添加元素 : put add offer
- 2 、删除元素: remove poll take
- 总结
前言
BlockingQueue是一个接口,是一个能够保证线程安全的队列;一般适用于生产-消费模式的场景;
一、生产消费模式?
该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。
我们自己coding实现这个模式的时候,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成重复消费和死锁,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个等待-唤醒逻辑都需要自己实现。
这么容易出错的事情,JDK当然帮我们做啦,这就是阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。
二、BlockingQueue的实现
- ArrayBlockingQueue 由数组支持的有界队列
- LinkedBlockingQueue 由链接节点支持的可选有界队列
- PriorityBlockingQueue 由优先级堆支持的无界优先级队列
- DelayQueue 由优先级堆支持的、基于时间的调度队列
有界无界的意思是 有没有队列的长度限制
方法 | 说明 |
add() | 如果插入成功则返回 true,否则抛出 IllegalStateException 异常 |
put() | 将指定的元素插入队列,如果队列满了,那么会阻塞直到有空间插入 |
offer() | 如果插入成功则返回 true,否则返回 false |
offer(E e, long timeout, TimeUnit unit) | 尝试将元素插入队列,如果队列已满,那么会阻塞直到有空间插入 |
方法 | 说明 |
take() | 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用 |
poll(long timeout, TimeUnit unit) | 检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null |
三、实战
1 添加元素 : put add offer
- add(E e) :往队列插入数据,当队列满时,插入元素时会抛出IllegalStateException异常;
- offer(E e):当往队列插入数据时,插入成功返回true,否则则返回false。当队列满时不会抛出异常;
- put:当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,直至阻塞队列已经有空余的容量可供使用
代码验证
/**
* @author fulin
*/
public class TestPark {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(5);
@SneakyThrows
public static void main(String[] args) {
TestPark testPark = new TestPark();
new Thread(() -> {
//arrayBlockingQueue.take();
for (int i = 0; i < 6; i++) {
try {
// testPark.arrayBlockingQueue.put(RandomUtil.randomInt());
testPark.arrayBlockingQueue.add(RandomUtil.randomInt());
// boolean offer = testPark.arrayBlockingQueue.offer(RandomUtil.randomInt());
// System.out.println(offer);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
System.out.println(testPark.arrayBlockingQueue.size());
}).start();
// 多线程 验证put 阻塞现象
// Thread.sleep(6000);
// new Thread(() -> {
// testPark.arrayBlockingQueue.poll();
// }).start();
}
}
1 add 验证
2 put 验证
3 offer 验证: 一共想放入六个元素,但是集合只有五个,那么当插入第六个的时候,线程会被阻塞; 我们再次启动一个线程,然后将之前的集合中的一个元素移除掉,此时有了空闲位置,那么offer才会成功,所以第一个线程将会阻塞大概6s;
这就是offer 会阻塞的代码验证;
2 、删除元素: remove poll take
- remove(Object o):从队列中删除数据,成功则返回true,否则为false
- poll:删除数据,当队列为空时,返回null;
- take():当阻塞队列为空时,获取队头数据的线程会被阻塞;
代码验证
/**
* @author fulin
*/
public class TestPark {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(5);
@SneakyThrows
public static void main(String[] args) {
TestPark testPark = new TestPark();
// 多线程 验证take 阻塞现象
new Thread(() -> {
try {
testPark.arrayBlockingQueue.take();
System.out.println("take成功");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
Thread.sleep(2000);
new Thread(() -> {
//arrayBlockingQueue.take();
for (int i = 0; i < 6; i++) {
try {
testPark.arrayBlockingQueue.put(RandomUtil.randomInt());
// testPark.arrayBlockingQueue.add(RandomUtil.randomInt());
// boolean offer = testPark.arrayBlockingQueue.offer(RandomUtil.randomInt());
// System.out.println(offer);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
System.out.println(testPark.arrayBlockingQueue.size());
}).start();
}
}
1 remove poll
2 take 验证take获删除元素,当集合为空那么会被阻塞,当经过两秒之后,另一个线程启动,会将元素开始放入集合中,那么阻塞才会结束;
总结
当使用BlockingQueue的时候, 这里要注意的最重要的事情是 BlockingQueue 用于协调生产和消费者它们之间的工作。
一般来说 会使用 DelayQueue 实现生产消费的延迟场景的需求;
LinkedBlockingQueue 用它来实现有界队列的生产消费模式;
一般不推荐使用无界队列,因为出现问题后将一直导致oom;