0
点赞
收藏
分享

微信扫一扫

回归Java基础:LinkedBlockingQueue阻塞队列解析

前言

整理了阻塞队列LinkedBlockingQueue的学习笔记,希望对大家有帮助。有哪里不正确,欢迎指出,感谢。

LinkedBlockingQueue的概述

LinkedBlockingQueue的继承体系图

我们先来看看LinkedBlockingQueue的继承体系。使用IntelliJ IDEA查看类的继承关系图形回归Java基础:LinkedBlockingQueue阻塞队列解析_出队

  • 蓝色实线箭头是指类继承关系
  • 绿色箭头实线箭头是指接口继承关系
  • 绿色虚线箭头是指接口实现关系。

LinkedBlockingQueue实现了序列化接口 Serializable,因此它有序列化的特性。LinkedBlockingQueue实现了BlockingQueue接口,BlockingQueue继承了Queue接口,因此它拥有了队列Queue相关方法的操作。

LinkedBlockingQueue的类图

类图来自Java并发编程之美回归Java基础:LinkedBlockingQueue阻塞队列解析_空指针异常_02

LinkedBlockingQueue主要特性:

  1. LinkedBlockingQueue底层数据结构为单向链表。
  2. LinkedBlockingQueue 有两个Node节点,一个head节点,一个tail节点,只能从head取元素,从tail添加元素。
  3. LinkedBlockingQueue 容量是一个原子变量count,它的初始值为0。
  4. LinkedBlockingQueue有两把ReentrantLock的锁,一把控制元素入队,一把控制出队,保证在并发情况下的线程安全。
  5. LinkedBlockingQueue 有两个条件变量,notEmpty 和 notFull。它们内部均有一个条件队列,存放着出入队列被阻塞的线程,这其实是生产者-消费者模型。

LinkedBlockingQueue的重要成员变量

  1. //容量范围,默认值为 Integer.MAX_VALUE
  2. private final int capacity;

  3. //当前队列元素个数
  4. private final AtomicInteger count = new AtomicInteger();

  5. //头结点
  6. transient Node<E> head;

  7. //尾节点
  8. private transient Node<E> last;

  9. //take, poll等方法的可重入锁
  10. private final ReentrantLock takeLock = new ReentrantLock();

  11. //当队列为空时,执行出队操作(比如take )的线程会被放入这个条件队列进行等待
  12. private final Condition notEmpty = takeLock.newCondition();

  13. //put, offer等方法的可重入锁
  14. private final ReentrantLock putLock = new ReentrantLock();

  15. //当队列满时, 执行进队操作( 比如put)的线程会被放入这个条件队列进行等待
  16. private final Condition notFull = putLock.newCondition();

LinkedBlockingQueue的构造函数

LinkedBlockingQueue有三个构造函数:

  1. 无参构造函数,容量为Integer.MAX
  2. public LinkedBlockingQueue() {
  3. this(Integer.MAX_VALUE);
  4. }

     2. 设置指定容量的构造器

  1. public LinkedBlockingQueue(int capacity) {
  2. if (capacity <= 0) throw new IllegalArgumentException();
  3. //设置队列大小
  4. this.capacity = capacity;
  5. //new一个null节点,head、tail节点指向该节点
  6. last = head = new Node<E>(null);
  7. }

     3. 传入集合,如果调用该构造器,容量默认也是Integer.MAX_VALUE

  1. public LinkedBlockingQueue(Collection<? extends E> c) {
  2. //调用指定容量的构造器
  3. this(Integer.MAX_VALUE);
  4. //获取put, offer的可重入锁
  5. final ReentrantLock putLock = this.putLock;
  6. putLock.lock();
  7. try {
  8. int n = 0;
  9. //循环向队列中添加集合中的元素
  10. for (E e : c) {
  11. if (e == null)
  12. throw new NullPointerException();
  13. if (n == capacity)
  14. throw new IllegalStateException("Queue full");
  15. //将队列的last节点指向该节点
  16. enqueue(new Node<E>(e));
  17. ++n;
  18. }
  19. //更新容量值
  20. count.set(n);
  21. } finally {
  22. //释放锁
  23. putLock.unlock();
  24. }
  25. }

LinkedBlockingQueue底层Node类

Node源码

  1. static class Node<E> {
  2. // 当前节点的元素值
  3. E item;
  4. // 下一个节点的索引
  5. Node<E> next;
  6. //节点构造器
  7. Node(E x) {
  8. item = x;
  9. }
  10. }

LinkedBlockingQueue的节点符合单向链表的数据结构要求:

  • 一个成员变量为当前节点的元素值
  • 一个成员变量是下一节点的索引
  • 构造方法的唯一参数节点元素值。

Node节点图

item表示当前节点的元素值,next表示指向下一节点的指针

回归Java基础:LinkedBlockingQueue阻塞队列解析_出队_03

LinkedBlockingQueue常用操作

offer操作

入队方法,其实就是向队列的尾部插入一个元素。如果元素为空,抛出空指针异常。如果队列已满,则丢弃当前元素,返回false,它是非阻塞的

offer源代码

offer方法源码如下:

  1. public boolean offer(E e) {
  2. //为空直接抛空指针
  3. if (e == null) throw new NullPointerException();
  4. final AtomicInteger count = this.count;
  5. //如果当前队列满了的话,直接返回false
  6. if (count.get() == capacity)
  7. return false;
  8. int c = -1;
  9. //构造新节点
  10. Node<E> node = new Node<E>(e);
  11. 获取putLock独占锁
  12. final ReentrantLock putLock = this.putLock;
  13. putLock.lock();
  14. try {
  15. //判断队列是否已满
  16. if (count.get() < capacity) {
  17. //进队列
  18. enqueue(node);
  19. //递增元素计数
  20. c = count.getAndIncrement();
  21. //如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
  22. if (c + 1 < capacity)
  23. notFull.signal();
  24. }
  25. } finally {
  26. //释放锁
  27. putLock.unlock();
  28. }
  29. //如果容量为0,则
  30. if (c == 0)
  31. //激活 notEmpty 的条件队列,唤醒被阻塞的线程
  32. signalNotEmpty();
  33. return c >= 0;
  34. }

enqueue方法源码如下:

  1. private void enqueue(Node<E> node) {
  2. //从尾节点加进去
  3. last = last.next = node;
  4. }

为了形象生动,我们用一张图来看看往队列里依次放入元素A和元素B。图片参考来源【细谈Java并发】谈谈LinkedBlockingQueue回归Java基础:LinkedBlockingQueue阻塞队列解析_流程图_04

signalNotEmpty方法源码如下

  1. private void signalNotEmpty() {
  2. //获取take独占锁
  3. final ReentrantLock takeLock = this.takeLock;
  4. takeLock.lock();
  5. try {
  6. //唤醒notEmpty条件队列里被阻塞的线程
  7. notEmpty.signal();
  8. } finally {
  9. //释放锁
  10. takeLock.unlock();
  11. }
  12. }

offer执行流程图

回归Java基础:LinkedBlockingQueue阻塞队列解析_空指针异常_05

基本流程:

  • 判断元素是否为空,如果是,就抛出空指针异常。
  • 判读队列是否已满,如果是,添加失败,返回false。
  • 如果队列没满,构造Node节点,上锁。
  • 判断队列是否已满,如果队列没满,Node节点在队尾加入队列待。
  • 加入队列后,判断队列是否还有空闲,如果是,唤醒notFull的阻塞线程。
  • 释放完锁后,判断容量是否为空,如果是,唤醒notEmpty的阻塞线程。

put操作

put方法也是向队列尾部插入一个元素。如果元素为null,抛出空指针异常。如果队列己满则阻塞当前线程,直到队列有空闲插入成功为止。如果队列空闲则插入成功,直接返回。如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。

put源代码

  1. public void put(E e) throws InterruptedException {
  2. ////为空直接抛空指针异常
  3. if (e == null) throw new NullPointerException();
  4. int c = -1;
  5. // 构造新节点
  6. Node<E> node = new Node<E>(e);
  7. //获取putLock独占锁
  8. final ReentrantLock putLock = this.putLock;
  9. final AtomicInteger count = this.count;
  10. //获取独占锁,它跟lock的区别,是可以被中断
  11. putLock.lockInterruptibly();
  12. try {
  13. //队列已满线程挂起等待
  14. while (count.get() == capacity) {
  15. notFull.await();
  16. }
  17. //进队列
  18. enqueue(node);
  19. //递增元素计数
  20. c = count.getAndIncrement();
  21. //如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
  22. if (c + 1 < capacity)
  23. notFull.signal();
  24. } finally {
  25. //释放锁
  26. putLock.unlock();
  27. }
  28. //如果容量为0,则
  29. if (c == 0)
  30. //激活 notEmpty 的条件队列,唤醒被阻塞的线程
  31. signalNotEmpty();
  32. }

put流程图

回归Java基础:LinkedBlockingQueue阻塞队列解析_出队_06

基本流程:

  • 判断元素是否为空,如果是就抛出空指针异常。
  • 构造Node节点,上锁(可中断锁)
  • 判断队列是否已满,如果是,阻塞当前线程,一直等待。
  • 如果队列没满,Node节点在队尾加入队列。
  • 加入队列后,判断队列是否还有空闲,如果是,唤醒notFull的阻塞线程。
  • 释放完锁后,判断容量是否为空,如果是,唤醒notEmpty的阻塞线程。

poll操作

从队列头部获取并移除一个元素, 如果队列为空则返回 null, 该方法是不阻塞的。

poll源代码

poll方法源代码

  1. public E poll() {
  2. final AtomicInteger count = this.count;
  3. //如果队列为空,返回null
  4. if (count.get() == 0)
  5. return null;
  6. E x = null;
  7. int c = -1;
  8. //获取takeLock独占锁
  9. final ReentrantLock takeLock = this.takeLock;
  10. takeLock.lock();
  11. try {
  12. //如果队列不为空,则出队,并递减计数
  13. if (count.get() > 0) {
  14. x = dequeue();
  15. c = count.getAndDecrement();
  16. ////容量大于1,则激活 notEmpty 的条件队列,唤醒被阻塞的线程
  17. if (c > 1)
  18. notEmpty.signal();
  19. }
  20. } finally {
  21. //释放锁
  22. takeLock.unlock();
  23. }
  24. if (c == capacity)
  25. //唤醒notFull条件队列里被阻塞的线程
  26. signalNotFull();
  27. return x;
  28. }

dequeue方法源代码

  1. //出队列
  2. private E dequeue() {
  3. //获取head节点
  4. Node<E> h = head;
  5. //获取到head节点指向的下一个节点
  6. Node<E> first = h.next;
  7. //head节点原来指向的节点的next指向自己,等待下次gc回收
  8. h.next = h; // help GC
  9. // head节点指向新的节点
  10. head = first;
  11. // 获取到新的head节点的item值
  12. E x = first.item;
  13. // 新head节点的item值设置为null
  14. first.item = null;
  15. return x;
  16. }

为了形象生动,我们用一张图来描述出队过程。图片参考来源【细谈Java并发】谈谈LinkedBlockingQueue

回归Java基础:LinkedBlockingQueue阻塞队列解析_空指针异常_07

signalNotFull方法源码

  1. private void signalNotFull() {
  2. //获取put独占锁
  3. final ReentrantLock putLock = this.putLock;
  4. putLock.lock();
  5. try {
  6. ////唤醒notFull条件队列里被阻塞的线程
  7. notFull.signal();
  8. } finally {
  9. //释放锁
  10. putLock.unlock();
  11. }
  12. }

poll流程图

回归Java基础:LinkedBlockingQueue阻塞队列解析_空指针异常_08

基本流程:

  • 判断元素是否为空,如果是,就返回null。
  • 加锁
  • 判断队列是否有元素,如果没有,释放锁
  • 如果队列有元素,则出队列,获取数据,容量计数器减一。
  • 判断此时容量是否大于1,如果是,唤醒notEmpty的阻塞线程。
  • 释放完锁后,判断容量是否满,如果是,唤醒notFull的阻塞线程。

peek操作

获取队列头部元素但是不从队列里面移除它,如果队列为空则返回 null。该方法是不阻塞的。

peek源代码

  1. public E peek() {
  2. //队列容量为0,返回null
  3. if (count.get() == 0)
  4. return null;
  5. //获取takeLock独占锁
  6. final ReentrantLock takeLock = this.takeLock;
  7. takeLock.lock();
  8. try {
  9. Node<E> first = head.next;
  10. //判断first是否为null,如果是直接返回
  11. if (first == null)
  12. return null;
  13. else
  14. return first.item;
  15. } finally {
  16. //释放锁
  17. takeLock.unlock();
  18. }
  19. }

peek流程图

回归Java基础:LinkedBlockingQueue阻塞队列解析_空指针异常_09

基本流程:

  • 判断队列容量大小是否为0,如果是,就返回null。
  • 加锁
  • 获取队列头部节点first
  • 判断节点first是否为null,是的话,返回null。
  • 如果fist不为null,返回节点first的元素。
  • 释放锁。

take操作

获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列 不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。

take源代码

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. final AtomicInteger count = this.count;
  5. //获取takeLock独占锁
  6. final ReentrantLock takeLock = this.takeLock;
  7. //获取独占锁,它跟lock的区别,是可以被中断
  8. takeLock.lockInterruptibly();
  9. try {
  10. //当前队列为空,则阻塞挂起
  11. while (count.get() == 0) {
  12. notEmpty.await();
  13. }
  14. //)出队并递减计数
  15. x = dequeue();
  16. c = count.getAndDecrement();
  17. if (c > 1)
  18. //激活 notEmpty 的条件队列,唤醒被阻塞的线程
  19. notEmpty.signal();
  20. } finally {
  21. //释放锁
  22. takeLock.unlock();
  23. }
  24. if (c == capacity)
  25. //激活 notFull 的条件队列,唤醒被阻塞的线程
  26. signalNotFull();
  27. return x;
  28. }

take流程图

回归Java基础:LinkedBlockingQueue阻塞队列解析_空指针异常_10

基本流程:

  • 加锁
  • 判断队列容量大小是否为0,如果是,阻塞当前线程,直到队列不为空。
  • 如果队列容量大小大于0,节点出队列,获取元素x,计数器减一。
  • 判断队列容量大小是否大于1,如果是,唤醒notEmpty的阻塞线程。
  • 释放锁。
  • 判断队列容量是否已满,如果是,唤醒notFull的阻塞线程。
  • 返回出队元素x

remove操作

删除队列里面指定的元素,有则删除并返回 true,没有则返回 false。

remove方法源代码

  1. public boolean remove(Object o) {
  2. //为空直接返回false
  3. if (o == null) return false;
  4. //双重加锁
  5. fullyLock();
  6. try {
  7. //边历队列,找到元素则删除并返回true
  8. for (Node<E> trail = head, p = trail.next;
  9. p != null;
  10. trail = p, p = p.next) {
  11. if (o.equals(p.item)) {
  12. //执行unlink操作
  13. unlink(p, trail);
  14. return true;
  15. }
  16. }
  17. return false;
  18. } finally {
  19. //解锁
  20. fullyUnlock();
  21. }
  22. }

双重加锁,fullyLock方法源代码

  1. void fullyLock() {
  2. //putLock独占锁加锁
  3. putLock.lock();
  4. //takeLock独占锁加锁
  5. takeLock.lock();
  6. }

unlink方法源代码

  1. void unlink(Node<E> p, Node<E> trail) {
  2. p.item = null;
  3. trail.next = p.next;
  4. if (last == p)
  5. last = trail;
  6. //如果当前队列满 ,则删除后,也不忘记唤醒等待的线程
  7. if (count.getAndDecrement() == capacity)
  8. notFull.signal();
  9. }

fullyUnlock方法源代码

  1. void fullyUnlock() {
  2. //与双重加锁顺序相反,先解takeLock独占锁
  3. takeLock.unlock();
  4. putLock.unlock();
  5. }

remove流程图

回归Java基础:LinkedBlockingQueue阻塞队列解析_空指针异常_11

基本流程

  • 判断要删除的元素是否为空,是就返回false。
  • 如果要删除的元素不为空,加双重锁
  • 遍历队列,找到要删除的元素,如果找不到,返回false。
  • 如果找到,删除该节点,返回true。
  • 释放锁

size操作

获取当前队列元素个数。

  1. public int size() {
  2. return count.get();
  3. }

由于进行出队、入队操作时的 count是加了锁的,所以结果相比ConcurrentLinkedQueue 的 size 方法比较准确。

总结

  • LinkedBlockingQueue底层通过单向链表实现。
  • 它有头尾两个节点,入队操作是从尾节点添加元素,出队操作是对头节点进行操作。
  • 它的容量是原子变量count,保证szie获取的准确性。
  • 它有两把独占锁,保证了队列操作原子性。
  • 它的两把锁都配备了一个条件队列,用来存放阻塞线程,结合入队、出队操作实现了一个生产消费模型。

Java并发编程之美中,有一张图惟妙惟肖描述了它,如下图:

回归Java基础:LinkedBlockingQueue阻塞队列解析_流程图_12

参看与感谢

  • 《Java并发编程之美》
  • 阻塞队列之LinkedBlockingQueue
  • Java并发之LinkedBlockingQueue
  • 【细谈Java并发】谈谈LinkedBlockingQueue

个人公众号

回归Java基础:LinkedBlockingQueue阻塞队列解析_空指针异常_13


    举报

    相关推荐

    0 条评论