0
点赞
收藏
分享

微信扫一扫

Java并发编程——LinkedBlockingQueue

一、阻塞队列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

1.1、BlockingQueue的基本原理

先来解释一下阻塞队列:

如上图:

  • 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
  • 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

查阅BlockingQueue总结了以下阻塞队列的方法:

1、boolean add(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。

2、boolean offer(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。

3、void put(E e)

  • 直接在队列中插入元素,当无可用空间时候,阻塞等待。

4、boolean offer(E e, long timeout, TimeUnit unit)

  • 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。

5、E take()

  • 获取并移除队列头部的元素,无元素时候阻塞等待。

6、E poll( long time, timeunit unit)

  • 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。

7、boolean remove()

  • 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。

8、E element()

  • 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。

9、E peek()

  • 不移除的情况下返回列头部的元素,队列为空无元素时返回null。

注意:

根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。 以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

 

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

二、LinkedBlockingQueue

LinkedBlockingQueue也是一个阻塞队列,相比于ArrayBlockingQueue,他的底层是使用链表(单向链表)实现的,而且是一个可有界可无界的队列,在生产和消费的时候使用了两把锁,提高并发,是一个高效的阻塞队列。

 

LinkedBlockingQueue底层的数据结构是链表,这一点很容易验证,在源码中,我们可以看到它有一个内部类Node,基本源码如下所示:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
	
	//链表节点定义	
    static class Node<E> {
		//节点中存放的值
        E item;

        //下一个节点
		/**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }
}	

从上面的注释可以知道,当某个node节点的next节点为null的时候,说明当前节点是最后一个节点。

LinkedBlockingQueue的基本成员属性如下代码所示:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 队列容量,最大为Integer.MAX_VALUE */
    private final int capacity;

    /** 队列长度 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 头结点
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * 尾结点
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** 移除操作的锁,take/poll方法用到 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 移除操作需要等待的条件notEmpty,与takeLock绑定 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 入队操作的锁,put/offer方法用到 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 入队操作需要等待的条件notFull,与putLock绑定 */
    private final Condition notFull = putLock.newCondition();
}	

可以看到,LinkedBlockingQueue内部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个等待条件:notEmpty和notFull。takeLock控制同一时刻只有一个线程从队列头部获取/移除元素,putLock控制同一时刻只有一个线程在队列尾部添加元素。

2.1、构造函数

  • 容量大小可以由构造函数的capacity设定,默认为:Integer.MAX_VALUE
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
		
    public LinkedBlockingQueue() {
		// 调用有参构造函数,初始化容量capacity为int最大值
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
		// 容量不能小于0,注意也不能等于0,这点与常规的集合不同
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
		// 初始化头结点和尾结点为哑节点
        last = head = new Node<E>(null);
    }
	
	// 将已有的集合全部入队
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
		// 获取到putLock锁
        final ReentrantLock putLock = this.putLock;
		// 加锁,保证线程安全
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
				// 节点内的值不能为null
                if (e == null)
                    throw new NullPointerException();
				// 判断队列是否满了	
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
				// 将Node节点添加到队列的尾部,last = last.next = new Node<E>(e);	
                enqueue(new Node<E>(e));
                ++n;
            }
			// 原子类设置Node节点个数,线程安全
            count.set(n);
        } finally {
			// 解锁
            putLock.unlock();
        }
    }	
}	

2.2、阻塞入队

LinkedBlockingQueue提供的入队的方法有多个,包括add、offer、put。

2.2.1、add(E e)方法

其中add(E e)调用的就是offer(E e),offer方法入队成功返回true,入队失败(队列已满或者阻塞超时)会返回false,那么add方法调用offer方法返回false的话,那么就抛出异常,代码如下:

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {
	
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
}

2.2.2、offer(E e)方法

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public boolean offer(E e) {
		// 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
		// 获取队列元素个数
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
			//如果已经满了,直接返回失败
            return false;
		// 预先设置c为 -1,约定负数为入队失败	
        int c = -1;
        Node<E> node = new Node<E>(e);
		// 获取入队锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
			//双重判断
            if (count.get() < capacity) {
				//加入链表
                enqueue(node);
                c = count.getAndIncrement();
				// 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1
                if (c + 1 < capacity)
					//唤醒生产者线程,继续插入
					// 如果添加数据后还队列还没有满,
					//则继续调用notFull的signal方法唤醒其他等待在入队的线程,继续插入
                    notFull.signal();
            }
        } finally {
			// 释放锁
            putLock.unlock();
        }
        if (c == 0)
			//说明里面有一个元素,唤醒消费者
            signalNotEmpty();
        return c >= 0;
    }
}	

2.2.3、offer(E e, long timeout, TimeUnit unit)方法

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
		
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
		// 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
		// 预先设置c为 -1,约定负数为入队失败
        int c = -1;
		// 获取入队锁
        final ReentrantLock putLock = this.putLock;
		// 获取队列元素个数
        final AtomicInteger count = this.count;
		// 加锁
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
				// 如果超时时间过了队列仍然是满的话就直接返回false
                if (nanos <= 0)
                    return false;
				// 否则调用awaitNanos等待,超时会返回<= 0L	
                nanos = notFull.awaitNanos(nanos);
            }
			// 如果上述没有阻塞,也就是队列没有满,那么这里直接入队
            enqueue(new Node<E>(e));
			// 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1
            c = count.getAndIncrement();
            if (c + 1 < capacity)
				// 如果添加数据后还队列还没有满,
				//则继续调用notFull的signal方法唤醒其他等待在入队的线程
                notFull.signal();
        } finally {
			// 释放锁
            putLock.unlock();
        }
		// c==0说明队列中有一个元素了,那么就需要唤醒其他正在等待出队的线程
		// 这一点可能不好理解,c = count.getAndIncrement();理解了就差不多
        if (c == 0)
            signalNotEmpty();
        return true;
    }
}	

我们一起总结一下上述的入队源码:

  • 1、入队第一步,上锁,这样保证了线程安全,保证了同一时刻只能有一个入队线程在操作队列。

  • 2、如果队列满了,那么会产生阻塞,如果阻塞时间过了,队列依旧是满的,那么将返回false,放弃入队。

  • 3、如果队列没有满,那么直接将入队元素加入到队列的尾部,然后检查当前队列是否满了,如果没有满,则唤醒其他入队线程。

  • 4、最后检查入队前的队列是否为空(c==0就表示当前入队操作前,是一个空队列),如果为空,那么就有可能存在等待出队的线程在阻塞着,那么在这里进行唤醒。

2.2.4、put(E e)方法

对于put方法,它也是入队的一个方法,这个方法和offer方法原理几乎一致,最大的区别在于put方法没有阻塞超时时间,如果队列满了,那么执行put方法的线程将一直阻塞下去。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public void put(E e) throws InterruptedException {
		// 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
		// 预先设置c为 -1,约定负数为入队失败
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
		// 使用AtomicInteger保证原子性
        final AtomicInteger count = this.count;
		// 获取put锁
        putLock.lockInterruptibly();
        try {
			// 如果队列满了,则进入put条件队列等待
            while (count.get() == capacity) {
                notFull.await();
            }
			// 队列不满,或者被取数线程唤醒了,那么会继续执行
			// 这里会往阻塞队列末尾添加一个数据
            enqueue(node);
            c = count.getAndIncrement();
			// 如果队列不满,则唤醒等待时间最长的put线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
			// 释放put锁
            putLock.unlock();
        }
		// 如果队列为空,再次获取put锁,然后唤醒等待时间最长的put线程
        if (c == 0)
            signalNotEmpty();
    }
	
	//直接放到链表的尾部
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }	
}	

2.3、阻塞出队

2.3.1、remove()方法

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {
    
    public E remove() 
        // 调用poll()方法出队
        E x = poll();
        if (x != null)
            // 如果有元素出队就返回这个元素
            return x;
        else
            // 如果没有元素出队就抛出异常
            throw new NoSuchElementException();
    }
}

2.3.2、poll()方法

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
		
    public E poll() {
        final AtomicInteger count = this.count;
		//如果队列为空,直接返回空
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
		// 获取take锁
        final ReentrantLock takeLock = this.takeLock;
		// 上锁
        takeLock.lock();
        try {
			// 如果队列不空
            if (count.get() > 0) {
				//调用dequeue获取队列中的数据
                x = dequeue();
				// 阻塞队列数量减1
                c = count.getAndDecrement();
				// 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
                if (c > 1)
					// 释放take锁
                    notEmpty.signal();
            }
        } finally {
			// 解锁
            takeLock.unlock();
        }
		// 如果c == capacity就是说队列中有一个空位,唤醒入队线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
}	

2.3.3、poll(long timeout, TimeUnit unit)方法

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
	
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
		// 获取take锁
        final ReentrantLock takeLock = this.takeLock;
		// 上锁
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
				// 如果队列空了,则进入take条件队列等待
				// 且如果阻塞时间过期,那么将返回null
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
			// 在超时时间内返回,则调用dequeue获取队列中的数据
            x = dequeue();
			// 阻塞队列数量减1
            c = count.getAndDecrement();
			// 如果c > 1,说明队列中还有节点元素,那么继续唤醒其他出队线程
            if (c > 1)
                notEmpty.signal();
        } finally {
			// 解锁
            takeLock.unlock();
        }
		// 如果c == capacity就是说队列中有一个空位,唤醒入队线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
}	

2.3.4、take()方法

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
		// 获取take锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
			// 如果队列空了,则进入take条件队列等待
            while (count.get() == 0) {
                notEmpty.await();
            }
			// 获取到第一个节点,非哑节点
            x = dequeue();
			// 阻塞队列数量减1
            c = count.getAndDecrement();
			// 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
            if (c > 1)
                notEmpty.signal();
        } finally {
			// 释放take锁
            takeLock.unlock();
        }
		// 如果队列满了,再次获取take锁,然后唤醒等待时间最长的take线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
	
	//通过这个方法可以看出,链表的首节点的值是null,每次获取元素的时候
	//先把首节点干掉,然后从第二个节点获取值
    private E dequeue() {
        Node<E> h = head;
		// 获取第一个元素结点first
        Node<E> first = h.next;
		// 将头结点自引用,并被垃圾回收掉
        h.next = h; // help GC
		// 将头结点指向第一个元素结点first
        head = first;
		// 获取第一个元素结点的值
        E x = first.item;
		// 将第一个元素结点的值置为null,成为新的哑节点
        first.item = null;
		// 返回被移除的节点元素值
        return x;
    }	
}	

take和put操作如下图所示:

  • 1、队列第一个节点为哑节点,占位用的;
  • 2、put操作一直往链表后面追加节点;
  • 3、take操作从链表头取节点;

三、ArrayBlockingQueue与LinkedBlockingQueue对比

队列 是否阻塞 是否有界 线程安全 适用场景
ArrayBlockingQueue 一把ReentrantLock锁 生产消费模型,平衡处理速度
LinkedBlockingQueue 可配置 两把ReentrantLock锁 生产消费模型,平衡处理速度

3.1、ArrayBlockingQueue

  • 数据结构:数组,存储空间预先分配,无需动态申请空间,使用过程中内存开销较小;

3.2、LinkedBlockingQueue:

  • 数据结构:单项链表,存储空间动态申请,会增加JVM垃圾回收负担;
  • 两把锁,并发性能较好;
  • 可设置为无界,吞吐量比较大,但是不稳定,入队速度太快有可能导致内存溢出。

参考: https://www.itzhai.com/articles/graphical-blocking-queue.html

https://segmentfault.com/a/1190000039174436

https://cloud.tencent.com/developer/article/1609320

举报

相关推荐

0 条评论