0
点赞
收藏
分享

微信扫一扫

【多线程】模拟阻塞队列,实现生产者消费者模型

@TOC

生产者消费者模型

什么是阻塞队列

阻塞队列 与 普通队列 的特性一样的:先进先出。
但是呢,相比于普通队列,阻塞队列也有着一些其它方面的功能!!!

  1. 线程安全 

  2. 产生阻塞效果
       2.1 如果队列为空,尝试出队一个元素,就会触发阻塞,一直阻塞到队列不为空为止。
       2.2 如果队列为满,尝试入队一个元素,就会触发阻塞,一直阻塞到队列不为满为止。

    基于上述特性,就可以实现“生产者消费者模型”。
    生产者消费者模型 是日常开发中,处理多线程问题的一个典型方式。

举个例子:

过年,吃饺子
既然吃饺子,就需要包饺子这件事。
而包出一个完美的饺子这件事很麻烦。
【和面,擀饺子皮,包饺子,煮/蒸。大概是这么一个流程,其中细节是非常多的】
如果数量非常多,就需要多人分工进行协作。

其中 和面 和 煮饺子 不太好进行分工。【一般和面是一个人负责,煮饺子也是一个人】
和面这件事,一坨面一起和。没有说拆成两个部分来和面的。那样口感就不一样了。
煮饺子,那就更简单了,一个人拿着勺子,等到煮熟了,直接捞起来就行了。

擀饺子皮 和 包饺子 就比较好分工了。
毕竟面皮是一张一张擀出来了,饺子也是一个一个包的。

我们主要考虑擀面皮 和 包饺子的过程。
假设 现有 A、B、C 三个人一起来擀饺子皮 + 包饺子。

协作方式1:

A、B、C 分别每个人都是先擀一张皮,然后再包一个饺子。
这种方式肯定是有效率的,毕竟三个人一起擀面皮和包饺子。肯定是比一个人要快的。

但是这种方式存在一定的问题,锁冲突比较激烈.
注意!擀饺子皮,需要一个重要的道具 “擀面杖”

在这里插入图片描述

问题就出在这里!擀面杖这个东西,一般只会买一个。

那么,如果此时A、B、C 三个都来擀面皮,故 三个人中,只能有一个人可以使用擀面杖,同时其他两个人,就需要等待,等待这个人使用完,让出来。然后,另外两个人就会出现竞争。
所以这个时候就会出现一系列的阻塞等待。

协作方式2:
A专门负责擀饺子皮,B和C专门负责包饺子。

因为 擀饺子皮的人,现在只有一个人
所以没有人跟他抢擀面杖(也就不会有锁的竞争了,同时也不会有阻塞等待的情况发生

此时,A就是饺子皮的生产者,要不断的生成一个些饺子皮。
B和C就是饺子皮的消费者,他们需要不断的 使用/消耗 饺子皮。

这种就是生产者消费者模型。
在这个模型中,既有生产者负责生产数据,消费者负责使用数据。
那么,生产者 和 消费者之间,需要有一个“桥梁” 来去进行沟通交互。
我们将 “桥梁” 称其为 “交易场所”。

放在 饺子 事件中,“交易场所” 就相当于 用来放饺子的那个 盖板。

A将生产出来的饺子皮放在盖板上,B、C消耗的饺子皮,要从盖板上面拿。

得有这样的一个空间来存放饺子皮,得有这样的一个空间来存储需要使用的数据。
这就是“交易场所”。

阻塞队列 就可以作为 生产者消费者模型 中的 “交易场所”。
生产者消费者模型,是实际开发中非常有用的一种多线程开发手段!!!
尤其是在服务器开发的场景中。

假设:

有两个服务器A、B

  1. A 作为入口服务器直接接收用户的网络请求。
  2. B 作为应用服务器,来给A提供一些数据。

情况一: 直接发送

image-20220509163158983

阻塞队列在开发中运用

情况二: 使用阻塞队列

image-20220509163333937

JAVA 中阻塞队列的主要用法

Java中内置阻塞队列,讲一下常用的入队和出队方法。

  1. 入队 put (阻塞) offer(无阻塞)

  2. 出队 take(阻塞) poll(无阻塞)

image-20220509163505950

模拟实现阻塞队列

  1. 先实现一个普通队列

  2. 加上线程安全
  3. 加上阻塞功能

因此 阻塞队列 是可以基于链表,也可以基于数组来实现
但是基于数组来实现阻塞队列更简单,所以写一个数组版本的阻塞队列

数组实现队列的重点就在于 循环队列

image-20220509164139892

所以

  1. > 先实现一个循环队列
class MyBlockingQueue{
    // 保存数据的本体
    private int[] data = new int[1000];
    // 有效元素个数
    private int usedSize;
    // 队头下标位置
    private int head;
    // 队尾下标位置
    private int rear;

    // 入队列
    public void put(int value){
        if(usedSize == this.data.length){
            // 如果队列满了,暂时先返回。
            return;
        }
        data[rear++] = value;
        //处理 rear 到达数组末尾的情况。
        if(rear >= data.length){
            rear = 0;
        }
        usedSize++;// 入队成功,元素个数加一。
    }
    // 出队列
    public Integer take(){
        if(usedSize == 0){
            // 如果队列为空,就返回一个 非法值
            return null;
        }
        int tmp = data[head];
        head++;
        if(head == data.length){
            head = 0;
        }
        usedSize--;
        return tmp;
    }
}

image-20220509164257334

  1. > 让队列在支持线程安全

    保证多线程环境下,调用这里的put 和 take 是没有问题的。
    使用加锁操作 synchronized

image-20220509164432378

  1. 实现阻塞

关键要点:使用 wait 和 notify机制。

image-20220509164531974

注意:
如果这里有三个线程都是使用的同一个 锁对象, notify 是 不可能实现精准唤醒 指定 wait 的。

notify 只能唤醒随机的一个等待的线程,不能做到精准。
如果想要精准,就必须使用不同的 锁对象。

想精准唤醒 t1,就必须专门为它创建一个锁对象 locker1,让t1 调用 locker1.wait。再对其进行 locker1.notify 才能唤醒

想精准唤醒 t2,就必须专门为它创建一个锁对象 locker2,让t2 调用 locker2.wait。再对其进行 locker2.notify 才能唤醒.
这样才能达到精准唤醒的效果。

代码:

class MyBlockingQueue{
    // 保存数据的本体
    private int[] data = new int[1000];
    // 有效元素个数
    private int usedSize;
    // 队头下标位置
    private int head;
    // 队尾下标位置
    private int rear;

    private Object locker = new Object();// 专门的锁对象
    // 入队列
    public void put(int value) throws InterruptedException {
        synchronized(locker){
            if(usedSize == this.data.length){
                // 如果队列满了,暂时先返回。
                //return;
                locker.wait();
            }
            data[rear++] = value;
            //处理 rear 到达数组末尾的情况。
            if(rear >= data.length){
                rear = 0;
            }
            usedSize++;// 入队成功,元素个数加一。
            locker.notify();
        }
    }

    // 出队列
    public Integer take() throws InterruptedException {
        synchronized(locker){
            if(usedSize == 0){
                // 如果队列为空,就返回一个 非法值
               // return null;
                locker.wait();
            }
            int tmp = data[head];
            head++;
            if(head == data.length){
                head = 0;
            }
            usedSize--;
            // 在 take成功之后,唤醒put中的等待。
            locker.notify();

            return tmp;
        }
    }
}

利用 阻塞队列 来构造一个 生产者和消费者模型

我通过构造两个线程,来实现一个简易的消费者生产者模型。

public class Test22 {
    private  static  MyBlockingQueue queue = new MyBlockingQueue();
    public static void main(String[] args) {
        // 实现一个 生产者消费者模型
        Thread producer = new Thread(()->{
            int num = 0;
           while (true){
               try {
                   System.out.println("生产了" + num);
                   queue.put(num);
                   num++;
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
        });
        producer.start();

        Thread customer = new Thread(()->{
           while (true){
               try {
                   int num = queue.take();
                   System.out.println("消费了"+num);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
        });
        customer.start();
    }
}

当前的场景中,只有一个消费者 和 一个 生产者。
如果多个生产者 和 消费者,那我们就多创建线程就行了。
为了更好看到效果,我们在给这这个程序中的“生产者”加上一个sleep。
让它生产的慢一些,此时消费者就只能跟生产的步伐走。
生产者生成一个,消费者就消费一个。

image-20220509164951335

下面我们来看一下执行效果。【生产的速度 没有消费速度快】

image-20220509165017443

我们再来将sleep代码的位置换到 消费者 代码中。
此时就是消费速度 没有生产速度快。
来看下面的效果

image-20220509165121669

完整的生产者消费者模型 + 阻塞队列

下面这个程序,是 生产速度非常,消费速度很慢。
取决于你给谁加上sleep

class MyBlockingQueue{
    // 保存数据的本体
    private int[] data = new int[1000];
    // 有效元素个数
    private int usedSize;
    // 队头下标位置
    private int head;
    // 队尾下标位置
    private int rear;

    private Object locker = new Object();// 专门的锁对象
    // 入队列
    public void put(int value) throws InterruptedException {
        synchronized(locker){
            if(usedSize == this.data.length){
                // 如果队列满了,暂时先返回。
                //return;
                locker.wait();
            }
            data[rear++] = value;
            //处理 rear 到达数组末尾的情况。
            if(rear >= data.length){
                rear = 0;
            }
            usedSize++;// 入队成功,元素个数加一。
            locker.notify();
        }
    }
    // 出队列
    public Integer take() throws InterruptedException {
        synchronized(locker){
            if(usedSize == 0){
                // 如果队列为空,就返回一个 非法值
               // return null;
                locker.wait();
            }
            int tmp = data[head];
            head++;
            if(head == data.length){
                head = 0;
            }
            usedSize--;
            // 在 take成功之后,唤醒put中的等待。
            locker.notify();

            return tmp;
        }
    }
}

public class Test22 {
    private  static  MyBlockingQueue queue = new MyBlockingQueue();
    public static void main(String[] args) {
        // 实现一个 生产者消费者模型
        Thread producer = new Thread(()->{
            int num = 0;
           while (true){
               try {
                   System.out.println("生产了" + num);
                   queue.put(num);
                   num++;
//                   Thread.sleep(500);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
        });
        producer.start();

        Thread customer = new Thread(()->{
           while (true){
               try {
                   int num = queue.take();
                   System.out.println("消费了"+num);
                   Thread.sleep(500);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
        });
        customer.start();
    }
}
举报

相关推荐

0 条评论