@TOC
生产者消费者模型
什么是阻塞队列
阻塞队列 与 普通队列 的特性一样的:先进先出。
但是呢,相比于普通队列,阻塞队列也有着一些其它方面的功能!!!
-
线程安全
-
产生阻塞效果
2.1 如果队列为空,尝试出队一个元素,就会触发阻塞,一直阻塞到队列不为空为止。
2.2 如果队列为满,尝试入队一个元素,就会触发阻塞,一直阻塞到队列不为满为止。基于上述特性,就可以实现“生产者消费者模型”。
生产者消费者模型 是日常开发中,处理多线程问题的一个典型方式。
举个例子:
过年,吃饺子
既然吃饺子,就需要包饺子这件事。
而包出一个完美的饺子这件事很麻烦。
【和面,擀饺子皮,包饺子,煮/蒸。大概是这么一个流程,其中细节是非常多的】
如果数量非常多,就需要多人分工进行协作。
其中 和面 和 煮饺子 不太好进行分工。【一般和面是一个人负责,煮饺子也是一个人】
和面这件事,一坨面一起和。没有说拆成两个部分来和面的。那样口感就不一样了。
煮饺子,那就更简单了,一个人拿着勺子,等到煮熟了,直接捞起来就行了。
擀饺子皮 和 包饺子 就比较好分工了。
毕竟面皮是一张一张擀出来了,饺子也是一个一个包的。
我们主要考虑擀面皮 和 包饺子的过程。假设 现有 A、B、C 三个人一起来擀饺子皮 + 包饺子。
协作方式1:
A、B、C 分别每个人都是先擀一张皮,然后再包一个饺子。
这种方式肯定是有效率的,毕竟三个人一起擀面皮和包饺子。肯定是比一个人要快的。
但是这种方式存在一定的问题,锁冲突比较激烈.
注意!擀饺子皮,需要一个重要的道具 “擀面杖”
问题就出在这里!擀面杖这个东西,一般只会买一个。
那么,如果此时A、B、C 三个都来擀面皮,故 三个人中,只能有一个人可以使用擀面杖,同时其他两个人,就需要等待,等待这个人使用完,让出来。然后,另外两个人就会出现竞争。
所以这个时候就会出现一系列的阻塞等待。
协作方式2:
A专门负责擀饺子皮,B和C专门负责包饺子。
因为 擀饺子皮的人,现在只有一个人
所以没有人跟他抢擀面杖
(也就不会有锁的竞争了,同时也不会有阻塞等待的情况发生
)
此时,A就是饺子皮的生产者,要不断的生成一个些饺子皮。
B和C就是饺子皮的消费者,他们需要不断的 使用/消耗 饺子皮。
这种就是生产者消费者模型。
在这个模型中,既有生产者负责生产数据,消费者负责使用数据。
那么,生产者 和 消费者之间,需要有一个“桥梁” 来去进行沟通交互。
我们将 “桥梁” 称其为 “交易场所”。
放在 饺子 事件中,“交易场所” 就相当于 用来放饺子的那个 盖板。
A将生产出来的饺子皮放在盖板上,B、C消耗的饺子皮,要从盖板上面拿。
得有这样的一个空间来存放饺子皮,得有这样的一个空间来存储需要使用的数据。
这就是“交易场所”。
阻塞队列 就可以作为 生产者消费者模型 中的 “交易场所”。
生产者消费者模型,是实际开发中非常有用的一种多线程开发手段!!!
尤其是在服务器开发的场景中。
假设:
有两个服务器A、B
- A 作为入口服务器直接接收用户的网络请求。
- B 作为应用服务器,来给A提供一些数据。
情况一: 直接发送
阻塞队列在开发中运用
情况二: 使用阻塞队列
JAVA 中阻塞队列的主要用法
Java中内置阻塞队列,讲一下常用的入队和出队方法。
-
入队
put (阻塞) offer(无阻塞) 出队
take(阻塞) poll(无阻塞)
模拟实现阻塞队列
-
先实现一个
普通队列
- 加上
线程安全
- 加上
阻塞功能
因此 阻塞队列 是可以基于链表,也可以基于数组来实现
但是基于数组来实现阻塞队列更简单,所以写一个数组版本的阻塞队列
数组实现队列
的重点就在于 循环队列
所以
- >
先实现一个循环队列
class MyBlockingQueue{
// 保存数据的本体
private int[] data = new int[1000];
// 有效元素个数
private int usedSize;
// 队头下标位置
private int head;
// 队尾下标位置
private int rear;
// 入队列
public void put(int value){
if(usedSize == this.data.length){
// 如果队列满了,暂时先返回。
return;
}
data[rear++] = value;
//处理 rear 到达数组末尾的情况。
if(rear >= data.length){
rear = 0;
}
usedSize++;// 入队成功,元素个数加一。
}
// 出队列
public Integer take(){
if(usedSize == 0){
// 如果队列为空,就返回一个 非法值
return null;
}
int tmp = data[head];
head++;
if(head == data.length){
head = 0;
}
usedSize--;
return tmp;
}
}
- >
让队列在支持线程安全
保证多线程环境下,调用这里的put 和 take 是没有问题的。
使用加锁操作 synchronized
实现阻塞
关键要点:使用 wait 和 notify机制。
注意:
如果这里有三个线程都是使用的同一个 锁对象, notify 是 不可能实现精准唤醒 指定 wait 的。
notify 只能唤醒随机的一个等待的线程,不能做到精准。
如果想要精准,就必须使用不同的 锁对象。
想精准唤醒 t1
,就必须专门为它创建一个锁对象 locker1
,让t1 调用 locker1.wait
。再对其进行 locker1.notify
才能唤醒
想精准唤醒 t2
,就必须专门为它创建一个锁对象 locker2
,让t2 调用 locker2.wait
。再对其进行 locker2.notify
才能唤醒.
这样才能达到精准唤醒
的效果。
代码:
class MyBlockingQueue{
// 保存数据的本体
private int[] data = new int[1000];
// 有效元素个数
private int usedSize;
// 队头下标位置
private int head;
// 队尾下标位置
private int rear;
private Object locker = new Object();// 专门的锁对象
// 入队列
public void put(int value) throws InterruptedException {
synchronized(locker){
if(usedSize == this.data.length){
// 如果队列满了,暂时先返回。
//return;
locker.wait();
}
data[rear++] = value;
//处理 rear 到达数组末尾的情况。
if(rear >= data.length){
rear = 0;
}
usedSize++;// 入队成功,元素个数加一。
locker.notify();
}
}
// 出队列
public Integer take() throws InterruptedException {
synchronized(locker){
if(usedSize == 0){
// 如果队列为空,就返回一个 非法值
// return null;
locker.wait();
}
int tmp = data[head];
head++;
if(head == data.length){
head = 0;
}
usedSize--;
// 在 take成功之后,唤醒put中的等待。
locker.notify();
return tmp;
}
}
}
利用 阻塞队列 来构造一个 生产者和消费者模型
我通过构造两个线程,来实现一个简易的消费者生产者模型。
public class Test22 {
private static MyBlockingQueue queue = new MyBlockingQueue();
public static void main(String[] args) {
// 实现一个 生产者消费者模型
Thread producer = new Thread(()->{
int num = 0;
while (true){
try {
System.out.println("生产了" + num);
queue.put(num);
num++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
Thread customer = new Thread(()->{
while (true){
try {
int num = queue.take();
System.out.println("消费了"+num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
}
当前的场景中,只有一个消费者 和 一个 生产者。
如果多个生产者 和 消费者,那我们就多创建线程就行了。
为了更好看到效果,我们在给这这个程序中的“生产者”加上一个sleep。
让它生产的慢一些,此时消费者就只能跟生产的步伐走。
生产者生成一个,消费者就消费一个。
下面我们来看一下执行效果。【生产的速度 没有消费速度快】
我们再来将sleep代码的位置换到 消费者 代码中。
此时就是消费速度 没有生产速度快。
来看下面的效果
完整的生产者消费者模型 + 阻塞队列
下面这个程序,是 生产速度非常,消费速度很慢。
取决于你给谁加上sleep
class MyBlockingQueue{
// 保存数据的本体
private int[] data = new int[1000];
// 有效元素个数
private int usedSize;
// 队头下标位置
private int head;
// 队尾下标位置
private int rear;
private Object locker = new Object();// 专门的锁对象
// 入队列
public void put(int value) throws InterruptedException {
synchronized(locker){
if(usedSize == this.data.length){
// 如果队列满了,暂时先返回。
//return;
locker.wait();
}
data[rear++] = value;
//处理 rear 到达数组末尾的情况。
if(rear >= data.length){
rear = 0;
}
usedSize++;// 入队成功,元素个数加一。
locker.notify();
}
}
// 出队列
public Integer take() throws InterruptedException {
synchronized(locker){
if(usedSize == 0){
// 如果队列为空,就返回一个 非法值
// return null;
locker.wait();
}
int tmp = data[head];
head++;
if(head == data.length){
head = 0;
}
usedSize--;
// 在 take成功之后,唤醒put中的等待。
locker.notify();
return tmp;
}
}
}
public class Test22 {
private static MyBlockingQueue queue = new MyBlockingQueue();
public static void main(String[] args) {
// 实现一个 生产者消费者模型
Thread producer = new Thread(()->{
int num = 0;
while (true){
try {
System.out.println("生产了" + num);
queue.put(num);
num++;
// Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
Thread customer = new Thread(()->{
while (true){
try {
int num = queue.take();
System.out.println("消费了"+num);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
}