0
点赞
收藏
分享

微信扫一扫

Java多线程--阻塞队列(BlockingQueue)--使用/教程/实例

树下的老石头 2022-02-06 阅读 94

原文网址:Java多线程--阻塞队列(BlockingQueue)--使用/教程/实例_IT利刃出鞘的博客-CSDN博客

简介

说明

        本文用示例介绍Java中阻塞队列(BlockingQueue)的用法。

队列类型

        BlockingQueue有这几种类型:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue、DelayedWorkQueue。

队列类型说明
ArrayBlockingQueue

        基于数组的FIFO队列;有界;创建时必须指定大小;

        使用一个重入锁,默认使用非公平锁,入队和出队共用一个锁,互斥。

LinkedBlockingQueue

        基于链表的FIFO队列;有/无界;默认大小是 Integer.MAX_VALUE(无界),可自定义(有界);

        两个重入锁分别控制元素的入队和出队,用Condition进行线程间的唤醒和等待。

        吞吐量通常要高于ArrayBlockingQueue。

        默认大小的LinkedBlockingQueue将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如, 在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

SynchronousQueue

        无缓存的等待队列;    无界;可认为大小为0。
        不保存提交任务,直接提交出去,超出corePoolSize个任务,直接创建新线程来执行任务,直到(corePoolSize+新建线程)> maximumPoolSize。

        此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线 程具有增长的可能性。

        吞吐量通常要高于LinkedBlockingQueue。

        //也有地方说:是一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态

        详见下边的:CachedThreadPool的execute流程

PriorityBlockingQueue

        基于链表的优先级队列;有/无界;默认大小是 Integer.MAX_VALUE,可自定义;

        类似于LinkedBlockingQueue,但是其所含对象的排序不是FIFO,而是依据对象的自然顺序或者构造函数的Comparator决定。

DelayedWorkQueue

常用方法

放入数据

方法说明
offer(E e)

向队列尾部插入一个元素。

如果队列中有空闲则插入成功后返回 true

如果队列己满则丢弃当前元素然后返回false。

如果e元素为null则抛出NullPointerException异常。

该方法是非阻塞的。

offer(E o, long timeout, TimeUnit unit)可以设定等待的时间,若在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
add(E e)

内部调用offer方法。

与直接调用offer的区别:

add:失败时,抛出异常

offer:失败时,返回false

put(E e)

向队列尾部插入一个元素。

如果队列中有空闲则插入后直接返回。

如果队列己满则阻塞当前线程,直到队列有空闲插入成功后返回。

如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常而返回。

如果e元素为null则抛出NullPointerException异常。

获取数据

方法说明
poll()

获取当前队列头部元素并从队列里面移除它。

如果队列为空则返回null。

poll(long timeout, TimeUnit unit)

从BlockingQueue取出(会删除对象)一个队首的对象。

一旦在指定时间内有数据可取,则立即返回队列中的数据。

若直到时间超时还没有数据可取,返回失败。

take()

获取当前队列头部元素并从队列里面移除它。

如果队列为空则阻塞当前线程直到队列不为空然后返回元素;

如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常而返回。

drainTo()

一次性从BlockingQueue获取(会删除对象)所有可用的数据对象(可指定获取数据的个数)。

本方法可提升获取数据效率,不需要多次分批加锁或释放锁。

其他方法 

方法说明
remainingCapacity()获取队列中剩余的空间
contains(Object o)判断队列中是否拥有该值。
remove(Object o)从队列中移除指定的值。
size()

获得队列中有多少值。

(返回AtomicLong的值)

ArrayBlockingQueue

简介

        ArrayBlockingQueue通过数组实现的FIFO有界阻塞队列,它的大小在实例被初始化的时候就被固定了,不能更改。

        该类支持一个可选的公平策略,用于被阻塞等待的线程获取独占锁的排序,因为ArrayBlockingQueue内部的操作都需要获取一个ReentrantLock锁,该锁是支持公平策略的,所以ArrayBlockingQueue的公平策略就直接作用于ReentrantLock锁,决定线程是否有公平获取锁的权利。默认情况下是非公平的,公平模式下队列按照FIFO顺序授予线程访问权。公平性通常会降低吞吐量,但会降低可变性并避免饥饿。

ArrayBlockingQueue的缺陷

        通过源码可以看见,ArrayBlockingQueue内部的几乎每一个操作方法都需要先获取同一个ReentrantLock独占锁才能进行,这极大的降低了吞吐量,几乎每个操作都会阻塞其它操作,最主要是插入操作和取出操作相互之间互斥。所以ArrayBlockingQueue不适用于需要高吞吐量的高效率数据生成与消费场景。LinkedBlockingQueue就能弥补其低吞吐量的缺陷。

实例

创建一个corePoolSize为2,maximumPoolSize为3的线程池。其中ArrayBlockingQueue设置缓存2个任务。执行6个任务。ArrayBlockingQueue为有界队列:

  1. 任务1和2在核心线程中执行;
  2. 任务3和4进来时,放到ArrayBlockingQueue缓存队列中,并且只能放2个(ArrayBlockingQueue设置的大小为2);
  3. 任务5和6进来的时候,任务5新建线程来执行任务,已经达到最大线程数3,所以任务6拒绝;
  4. 当有线程执行完的时候,再将任务3和4从队列中取出执行

创建线程池代码如下:

  /**
     * ArrayBlockingQueue
     */
    private static void arrayQueue() {
        System.out.println("\n\n =======ArrayBlockingQueue====== \n\n");
        Executor executors = new ThreadPoolExecutor(
                2, 3, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                new RejectHandler());
        execute(executors);
    }

 执行结果如下

1 is running... 
2 is running... 
6 is rejected ^^ //6被拒
5 is running...  //5新建线程执行
1 is end !!! 
2 is end !!! 
3 is running... //1和2执行完之后3和4才执行
4 is running... 
5 is end !!! 

LinkedBlockingQueue

简介

LinkedBlockingQueue和ArrayBlockingQueue的相同点:

  1. 是FIFO队列,不允许插入null值。
  2. 容量在实例被构造完成之后不允许被更改

不同点

LinkedBlockingQueueArrayBlockingQueue
大小指定实例化时可指定队列大小,也可以不指定大小(这时候默认就是Integer.MAX_VALUE)。实例化时必须指定大小。
吞吐量

大。

采用了“双锁队列” 算法,元素的入队和出队分别由putLock、takeLock两个独立的可重入锁来实现。

小。

几乎每一个方法都需要先获取同一个ReentrantLock独占锁才能进行。

实例

创建一个corePoolSize为2,maximumPoolSize为3的线程池。无界队列。同样执行6个任务

  1. 核心线程执行任务1和2,其它的任务3~6放到队列中
  2. 执行完1和2,将3和4从队列中取出执行
  3. 执行完3和4,将5和6从队列中取出

创建线程池代码如下:

 
    /**
     * LinkedBlockingQueue
     */
    private static void linkedQueue() {
        System.out.println("\n\n =======LinkedBlockingQueue====== \n\n");
        Executor executors = new ThreadPoolExecutor(
                2, 3, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(),
                new RejectHandler());
        execute(executors);
    }

运行结果如下:

1 is running... 
2 is running... //中间线程休眠 
2 is end !!!   //10s之后才运行完
1 is end !!! 
3 is running...  //任务3和4才执行
4 is running... 
4 is end !!! 
3 is end !!! 
6 is running... 
5 is running... 
5 is end !!! 
6 is end !!! 

SynchronousQueue

说明

        创建一个corePoolSize为2,maximumPoolSize为3的线程池。执行6个任务。根据我的理解,SynchrousQueue是个一个无缓存的队列(理论依据为SynchrousQueue源码可以看到:isEmpty()始终为true;size()始终返回0)

根据参数设置应该只可以执行3个任务:

  1. 2个核心线程执行2个任务;
  2. 第3个任务的时候,创建线程来执行任务3;
  3. 当第4个任务来的时候,此时已经超过了maximumPoolSize,所以拒绝任务。

代码

     /**
     * SynchronousQueue
     */
    private static void syncQueue() {
        System.out.println("\n\n =======SynchronousQueue====== \n\n");
        Executor executors = new ThreadPoolExecutor(
                2, 3, 30, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new RejectHandler());
        execute(executors);
    }

执行结果

1 is running... 
4 is rejected ^^ //4被拒
2 is running... 
3 is running... 
5 is rejected ^^  //5被拒
6 is rejected ^^  //6被拒
3 is end !!! 
1 is end !!! 
2 is end !!! 

PriorityBlockingQueue

简介

        PriorityBlockingQueue是一个无限容量的阻塞队列,由于容量是无限的所以put等入队操作其实不存在阻塞,只要内存足够都能够立即入队成功,当然多个入队操作的线程之间还是存在竞争唯一锁的互斥访问。虽然PriorityBlockingQueue逻辑上是无界的,但是尝试添加元素时还是可能因为资源耗尽而抛出OutOfMemoryError。

        该队列也不允许放入null值,它使用与类java.util.PriorityQueue 相同的排序规则,也不允许放入不可比较的对象,这样做会导致ClassCastException。

        值得注意的是,虽然PriorityBlockingQueue叫优先级队列,但是并不是说元素一入队就会按照排序规则被排好序,而是只有通过调用take、poll方法出队或者drainTo转移出的队列顺序才是被优先级队列排过序的。所以通过调用 iterator() 以及可拆分迭代器 spliterator() 方法返回的迭代器迭代的元素顺序都没有被排序。如果需要有序遍历可以通过 Arrays.sort(pq.toArray()) 方法来排序。注意peek方法永远只获取且不删除第一个元素,所以多次调用peek都是返回同样的值。

        PriorityBlockingQueue其实是通过Comparator来排序的,要么入队的元素实现了Comparator接口(即所谓的自然排序),要么构造PriorityBlockingQueue实例的时候传入一个统一的Comparator实例,如果两者兼备那么以后者为准

        PriorityBlockingQueue不保证具有相同优先级的元素顺序,但是你可以定义自定义类或比较器,通过辅助属性来决定优先级相同的元素的顺序,后文会举例说明。

DelayedWorkQueue

简介

        为什么不直接使用DelayQueue而要重新实现一个DelayedWorkQueue呢,可能是了方便在实现过程中加入一些扩展。

使用场景

  • 重试机制实现。比如当调用接口失败后,把当前调用信息放入delay=10s的元素,然后把元素放入队列,那么这个队列就是一个重试队列,一个线程通过take方法获取需要重试的接口,take返回则接口进行重试,失败则再次放入队列,同时也可以在元素加上重试次数。

  • TimerQueue的内部实现

顺序消费实例

场景

机器要对手机按顺序做如下任务:生产、打包、发货。消费者等待收货。

代码

package org.example.a;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class Phone {
    /**
     * 手机的状态:
     * PRODUCED: 已生产
     * PACKED: 已打包
     * DELIVERED: 已发货
     * <p>手机的状态只能由PRODUCED->PACKED->DELIVERED转变
     */
    public enum Status {
        PRODUCED, PACKED, DELIVERED
    }
    private Status status = Status.PRODUCED;//默认状态为PRODUCED
    private final int id;
    public Phone(int id) { this.id =  id;}
    public void pack() {status = Status.PACKED;}
    public void deliver() {status = Status.DELIVERED;}
    public Status getStatus() {return status;}
    public int getId() {return id;}
    public String toString() {
        return "Phone id: " + id + ", status: " + status;
    }
}

class PhoneQueue extends LinkedBlockingQueue<Phone> {}

/**
 * 生产手机的任务。
 */
class Producer implements Runnable {
    private PhoneQueue PhoneQueue;
    private int count = 0;
    private Random random = new Random(47);
    public Producer(PhoneQueue queue) {
        this.PhoneQueue = queue;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                TimeUnit.MILLISECONDS.sleep(300 + random.nextInt(500));
                //生产一片手机,这些手机是有序的
                Phone Phone = new Phone(count++);
                System.out.println(Phone);
                //放到PhoneQueue中
                PhoneQueue.put(Phone);
            }
        } catch (InterruptedException e) {
            System.out.println("Producer interrupted.");
        }
        System.out.println("Producer off.");
    }
}

/**
 * 打包的任务。
 */
class Packer implements Runnable {
    private PhoneQueue producedQueue;
    private PhoneQueue PackedQueue;
    public Packer(PhoneQueue producedQueue, PhoneQueue PackedQueue) {
        this.producedQueue = producedQueue;
        this.PackedQueue = PackedQueue;
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //在取得下一个手机之前会一直阻塞
                Phone Phone = producedQueue.take();
                Phone.pack();
                System.out.println(Phone);
                PackedQueue.put(Phone);
            }
        } catch (InterruptedException e) {
            System.out.println("Packer interrupted.");
        }
        System.out.println("Packer off.");
    }
}

/**
 * 发货的任务。
 */
class Deliverer implements Runnable {
    private PhoneQueue butteredQueue;
    private PhoneQueue finishedQueue;
    public Deliverer(PhoneQueue butteredQueue, PhoneQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
        this.butteredQueue = butteredQueue;
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //在取得下一个手机之前会一直阻塞
                Phone phone = butteredQueue.take();
                phone.deliver();
                System.out.println(phone);
                finishedQueue.put(phone);
            }
        } catch (InterruptedException e) {
            System.out.println("Deliverer interrupted.");
        }
        System.out.println("Deliverer off.");
    }
}

/**
 * 买手机的人,消费者。
 */
class Consumer implements Runnable {
    private PhoneQueue finishedQueue;
    private int count = 0;
    public Consumer(PhoneQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //在取得下一个手机之前会一直阻塞
                Phone phone = finishedQueue.take();
                //验证取得的手机是有序的,而且状态是DELIVERED的
                if (phone.getId() != count++ ||
                        phone.getStatus() != Phone.Status.DELIVERED) {
                    System.out.println("Error -> " + phone);
                    System.exit(-1);
                } else {
                    //使用手机
                    System.out.println(phone + "->Use");
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Consumer interrupted.");
        }
        System.out.println("Consumer off.");
    }
}
public class Demo {
    public static void main(String[] args) {
        PhoneQueue producedQueue = new PhoneQueue();
        PhoneQueue packedQueue = new PhoneQueue();
        PhoneQueue deliveredQueue = new PhoneQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Producer(producedQueue));
        exec.execute(new Packer(producedQueue, packedQueue));
        exec.execute(new Deliverer(packedQueue, deliveredQueue));
        exec.execute(new Consumer(deliveredQueue));
        try {
            TimeUnit.SECONDS.sleep(5);
        }catch (Exception e){
            e.printStackTrace();
        }
        exec.shutdownNow();
    }
}

执行结果

Phone id: 0, status: PRODUCED
Phone id: 0, status: PACKED
Phone id: 0, status: DELIVERED
Phone id: 0, status: DELIVERED->Use
Phone id: 1, status: PRODUCED
Phone id: 1, status: PACKED
Phone id: 1, status: DELIVERED
Phone id: 1, status: DELIVERED->Use
Phone id: 2, status: PRODUCED
Phone id: 2, status: PACKED
Phone id: 2, status: DELIVERED
Phone id: 2, status: DELIVERED->Use
Phone id: 3, status: PRODUCED
Phone id: 3, status: PACKED
Phone id: 3, status: DELIVERED
Phone id: 3, status: DELIVERED->Use
Phone id: 4, status: PRODUCED
Phone id: 4, status: PACKED
Phone id: 4, status: DELIVERED
Phone id: 4, status: DELIVERED->Use
Phone id: 5, status: PRODUCED
Phone id: 5, status: PACKED
Phone id: 5, status: DELIVERED
Phone id: 5, status: DELIVERED->Use
Phone id: 6, status: PRODUCED
Phone id: 6, status: PACKED
Phone id: 6, status: DELIVERED
Phone id: 6, status: DELIVERED->Use
Phone id: 7, status: PRODUCED
Phone id: 7, status: PACKED
Phone id: 7, status: DELIVERED
Phone id: 7, status: DELIVERED->Use
Consumer interrupted.
Packer interrupted.
Packer off.
Deliverer interrupted.
Deliverer off.
Producer interrupted.
Producer off.
Consumer off.

其他网址

Java同步数据结构之ArrayBlockingQueue - 莫待樱开春来踏雪觅芳踪 - 博客园

BlockingQueue的原理及使用方法_Java_Yolanda_NuoNuo的专栏-CSDN博客Java并发(四)BlockingQueue的使用 - 摆渡者 - OSCHINALinkedBlockingQueue 的使用_Java_一只小棉花的博客-CSDN博客

线程池队列_watson1360884839的博客-CSDN博客
线程池的三种缓存队列_Java_nihaomabmt的专栏-CSDN博客

举报

相关推荐

0 条评论