0
点赞
收藏
分享

微信扫一扫

深入学习java源码之BlockingDeque.addFirst()与BlockingDeque.take()


深入学习java源码之BlockingDeque.addFirst()与BlockingDeque.take()

ava集合类库将集合的接口与实现分离。同样的接口,可以有不同的实现。

Java集合类的基本接口是Collection接口。而Collection接口必须继承java.lang.Iterable接口。

以下图表示集合框架的接口,java.lang以及java.util两个包里的。其他部分可以从左向右看,比如Collection的Subinterfaces有List,Set以及Queue等。

深入学习java源码之BlockingDeque.addFirst()与BlockingDeque.take()_ci

如果一个线程既要向队列中添加元素,又要从同一个队列中取元素,那么BlockingDeque将是非常有用的。如果消费者线程既要从队列的头部取元素,也要从队列的尾部取元素;或者生产者线程需要插入元素到队列的两端,那么BlockingDeque也是非常有用的。

深入学习java源码之BlockingDeque.addFirst()与BlockingDeque.take()_java_02

LinkedBlockingDeque和LinkedBlockingQueue的相同点在于: 
1. 基于链表 
2. 容量可选,不设置的话,就是Int的最大值

和LinkedBlockingQueue的不同点在于: 
1. 双端链表和单链表 
2. 不存在哨兵节点 
3. 一把锁+两个条件BlockingDeque 是java.util.concurrent包中提供的一个接口。该接口表示一个双端队列。该双端队列,线程可以安全的插入,和取出元素。线程插入或者移出队列中的元素时,可能会阻塞。

有两个比较相似的并发阻塞队列,LinkedBlockingQueue和LinkedBlockingDeque,两个都是队列,只不过前者只能一端出一端入,后者则可以两端同时出入,并且都是结构改变线程安全的队列。

LinkedBlockingDeque一个线程可以插入元素到队列的任一端。如果队列full,那么线程将会阻塞,直到其他线程从队列中取出一个元素为止。如果队列empty,那么从队列中取元素的线程将会阻塞,直到其他线程插入一个元素为止。

BlockingDeque继承于BlockingQueue

BlockDeque接口继承自BlcokingQueue。这意味着你可以使用BlockingDeque作为一个BlockingQueue。如果你使用BlockingDeque作为BlockingQueue,那么BlockingQueue的插入操作就是把元素插入到BlockDeque的尾部。移出操作将是移出BlockDeque头部的元素 BlockingDeque实现 

深入学习java源码之BlockingDeque.addFirst()与BlockingDeque.take()_java_03

BlockingDeque是一个接口,必须使用java.util.concurrent包有LinkedBlockingDeque它的实现类

BlockingDeque deque = new LinkedBlockingDeque();
deque.addFirst("1");
deque.addLast("2");
String two = deque.takeLast();
String one = deque.takeFirst();

深入学习java源码之BlockingDeque.addFirst()与BlockingDeque.take()_双端队列_04

LinkedBlockingDeque的lock

LinkedBlockingDeque的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。LinkedBlockingDeque是一个带有长度的阻塞队列,初始化的时候可以指定队列长度(如果不指定就是Integer.MAX_VALUE),且指定长度之后不允许进行修改。

 

java源码

Modifier and Type

Method and Description

​boolean​

​add(E​

在此deque的末尾插入指定的元素,除非它会违反容量限制。

​void​

​addFirst(E​

插入此双端队列的前面,如果它是立即可行且不会违反容量限制,抛出一个指定的元素 ​​IllegalStateException​​如果当前没有空间可用。

​void​

​addLast(E​

在插入如果它是立即可行且不会违反容量限制,抛出此双端队列的末尾指定元素 ​​IllegalStateException​​如果当前没有空间可用。

​void​

​clear()​

从这个deque原子地删除所有的元素。

​boolean​

​contains(Object​

如果此deque包含指定的元素,则返回 ​​true​​ 。

​Iterator<E>​

​descendingIterator()​

以相反的顺序返回此deque中的元素的迭代器。

​int​

​drainTo(Collection<? super E> c)​

从该队列中删除所有可用的元素,并将它们添加到给定的集合中。

​int​

​drainTo(Collection<? super E> c, int maxElements)​

最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中。

​E​

​element()​

检索,但不删除,由这个deque表示的队列的头。

​E​

​getFirst()​

检索,但不删除,这个deque的第一个元素。

​E​

​getLast()​

检索,但不删除,这个deque的最后一个元素。

​Iterator<E>​

​iterator()​

以正确的顺序返回此deque中的元素的迭代器。

​boolean​

​offer(E​

将指定的元素插入由此deque表示的队列(换句话说,在该deque的尾部),如果可以立即执行,而不违反容量限制, ​​true​​​在成功时 ​​false​​如果当前没有可用空间,则返回false。

​boolean​

​offer(E e, long timeout, TimeUnit​

将指定的元素插入由此deque表示的队列中(换句话说,在该deque的尾部),等待指定的等待时间(如果需要空间可用)。

​boolean​

​offerFirst(E​

插入此双端队列的前面,如果它是立即可行且不会违反容量限制,返回指定的元素 ​​true​​​在成功和 ​​false​​ ,如果当前没有空间可用。

​boolean​

​offerFirst(E e, long timeout, TimeUnit​

在此deque的前面插入指定的元素,等待指定的等待时间(如果需要空间可用)。

​boolean​

​offerLast(E​

插入此双端队列的末尾,如果它是立即可行且不会违反容量限制,返回指定的元素 ​​true​​​在成功和 ​​false​​ ,如果当前没有空间可用。

​boolean​

​offerLast(E e, long timeout, TimeUnit​

在此deque的末尾插入指定的元素,如果需要空间可用,等待指定的等待时间。

​E​

​peek()​

检索但不删除由此deque表示的队列的头部(换句话说,此deque的第一个元素),如果此deque为空,则返回 ​​null​​ 。

​E​

​peekFirst()​

检索但不删除此deque的第一个元素,如果此deque为空,则返回 ​​null​​ 。

​E​

​peekLast()​

检索但不删除此deque的最后一个元素,如果此deque为空,则返回 ​​null​​ 。

​E​

​poll()​

检索并删除由此deque表示的队列的头部(换句话说,该deque的第一个元素),如果此deque为空,则返回 ​​null​​ 。

​E​

​poll(long timeout, TimeUnit​

检索并删除由此deque(换句话说,该deque的第一个元素)表示的队列的头部,等待到指定的等待时间(如有必要)使元素变为可用。

​E​

​pollFirst()​

检索并删除此deque的第一个元素,如果此deque为空,则返回 ​​null​​ 。

​E​

​pollFirst(long timeout, TimeUnit​

检索并删除此deque的第一个元素,等待指定的等待时间(如有必要),使元素变为可用。

​E​

​pollLast()​

检索并删除此deque的最后一个元素,如果此deque为空,则返回 ​​null​​ 。

​E​

​pollLast(long timeout, TimeUnit​

检索并删除此deque的最后一个元素,等待到指定的等待时间,如果需要,元素可用。

​E​

​pop()​

从这个deque表示的堆栈中弹出一个元素。

​void​

​push(E​

将元素推入此双端队列表示的堆栈(换句话说,在该双端队列的头部),如果它是立即可行且不会违反容量限制,抛出 ​​IllegalStateException​​如果当前没有空间可用。

​void​

​put(E​

将指定的元素插入由此deque表示的队列(换句话说,在该deque的尾部),等待空格变为可用时。

​void​

​putFirst(E​

在此deque的前面插入指定的元素,如有必要,等待空格变为可用。

​void​

​putLast(E​

在此deque的末尾插入指定的元素,如有必要,等待空格变为可用。

​int​

​remainingCapacity()​

返回此deque可以理想地(在没有内存或资源限制)的情况下接受而不阻止的附加元素数。

​E​

​remove()​

检索并删除由此deque表示的队列的头部。

​boolean​

​remove(Object​

从此deque中删除指定元素的第一个出现。

​E​

​removeFirst()​

检索并删除此deque的第一个元素。

​boolean​

​removeFirstOccurrence(Object​

从此deque中删除指定元素的第一个出现。

​E​

​removeLast()​

检索并删除此deque的最后一个元素。

​boolean​

​removeLastOccurrence(Object​

从此deque中删除指定元素的最后一次出现。

​int​

​size()​

返回此deque中的元素数。

​Spliterator<E>​

​spliterator()​

在此deque中的元素上返回​​Spliterator​​ 。

​E​

​take()​

检索并删除由此deque(换句话说,该deque的第一个元素)表示的队列的头部,如果需要,等待,直到元素可用。

​E​

​takeFirst()​

检索并删除此deque的第一个元素,如有必要等待,直到元素可用。

​E​

​takeLast()​

检索并删除此deque的最后一个元素,如有必要等待,直到元素可用。

​Object[]​

​toArray()​

以适当的顺序(从第一个到最后一个元素)返回一个包含此deque中所有元素的数组。

​<T> T[]​

​toArray(T[] a)​

以适当的顺序返回一个包含此deque中所有元素的数组; 返回的数组的运行时类型是指定数组的运行时类型。

​String​

​toString()​

返回此集合的字符串表示形式。

package java.util.concurrent;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;

public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {

private static final long serialVersionUID = -387911632671998426L;
/** 数据结构双向链表节点 */
static final class Node<E> {
/**
* 元素值
*/
E item;
/**
* 节点前驱
* 1.指向前驱;2.指向this,说明前驱是尾节点,看unlinklast;3.指向null说明没有前驱
*/
Node<E> prev;
/**
* 节点后继
* 1.指向后继;2.指向this,说明后继是头结点,看unlinkfirst;3.指向null说明没有后继
*/
Node<E> next;

Node(E x) {
item = x;
}
}

transient Node<E> first;

transient Node<E> last;

/** Number of items in the deque */
private transient int count;

/** Maximum number of items in the deque */
private final int capacity;

/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();

/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();

/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();

public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}

public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try {
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
lock.unlock();
}
}


// Basic linking and unlinking operations, called only while holding lock
private boolean linkFirst(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
notEmpty.signal();
return true;
}

private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
}

private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();
return item;
}

private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
Node<E> l = last;
if (l == null)
return null;
Node<E> p = l.prev;
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
notFull.signal();
return item;
}

void unlink(Node<E> x) {
// assert lock.isHeldByCurrentThread();
Node<E> p = x.prev;
Node<E> n = x.next;
if (p == null) {
unlinkFirst();
} else if (n == null) {
unlinkLast();
} else {
p.next = n;
n.prev = p;
x.item = null;
// Don't mess with x's links. They may still be in use by
// an iterator.
--count;
notFull.signal();
}
}

// BlockingDeque methods
public void addFirst(E e) {
if (!offerFirst(e))
throw new IllegalStateException("Deque full");
}

public void addLast(E e) {
if (!offerLast(e))
throw new IllegalStateException("Deque full");
}

public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkFirst(node);
} finally {
lock.unlock();
}
}

public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkLast(node);
} finally {
lock.unlock();
}
}

public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}

public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}

public boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (!linkFirst(node)) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}

public boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (!linkLast(node)) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}

public E removeFirst() {
E x = pollFirst();
if (x == null) throw new NoSuchElementException();
return x;
}

public E removeLast() {
E x = pollLast();
if (x == null) throw new NoSuchElementException();
return x;
}

public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst();
} finally {
lock.unlock();
}
}

public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
} finally {
lock.unlock();
}
}

public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}

public E pollLast(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkLast()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}

public E getFirst() {
E x = peekFirst();
if (x == null) throw new NoSuchElementException();
return x;
}

public E getLast() {
E x = peekLast();
if (x == null) throw new NoSuchElementException();
return x;
}

public E peekFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (first == null) ? null : first.item;
} finally {
lock.unlock();
}
}

public E peekLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (last == null) ? null : last.item;
} finally {
lock.unlock();
}
}

public boolean removeFirstOccurrence(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> p = first; p != null; p = p.next) {
if (o.equals(p.item)) {
unlink(p);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}

public boolean removeLastOccurrence(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> p = last; p != null; p = p.prev) {
if (o.equals(p.item)) {
unlink(p);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}

// BlockingQueue methods
public boolean add(E e) {
addLast(e);
return true;
}

public boolean offer(E e) {
return offerLast(e);
}

public void put(E e) throws InterruptedException {
putLast(e);
}

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
return offerLast(e, timeout, unit);
}

public E remove() {
return removeFirst();
}

public E poll() {
return pollFirst();
}

public E take() throws InterruptedException {
return takeFirst();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return pollFirst(timeout, unit);
}

public E element() {
return getFirst();
}

public E peek() {
return peekFirst();
}

public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return capacity - count;
} finally {
lock.unlock();
}
}

public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(maxElements, count);
for (int i = 0; i < n; i++) {
c.add(first.item); // In this order, in case add() throws.
unlinkFirst();
}
return n;
} finally {
lock.unlock();
}
}

// Stack methods
public void push(E e) {
addFirst(e);
}

/**
* @throws NoSuchElementException {@inheritDoc}
*/
public E pop() {
return removeFirst();
}

// Collection methods
public boolean remove(Object o) {
return removeFirstOccurrence(o);
}

public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}

public boolean contains(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> p = first; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
lock.unlock();
}
}

// /**
// * Adds all of the elements in the specified collection to this
// * queue. Attempts to addAll of a queue to itself result in
// * {@code IllegalArgumentException}. Further, the behavior of
// * this operation is undefined if the specified collection is
// * modified while the operation is in progress.
// *
// * @param c collection containing elements to be added to this queue
// * @return {@code true} if this queue changed as a result of the call
// * @throws ClassCastException {@inheritDoc}
// * @throws NullPointerException {@inheritDoc}
// * @throws IllegalArgumentException {@inheritDoc}
// * @throws IllegalStateException if this deque is full
// * @see #add(Object)
// */
// public boolean addAll(Collection<? extends E> c) {
// if (c == null)
// throw new NullPointerException();
// if (c == this)
// throw new IllegalArgumentException();
// final ReentrantLock lock = this.lock;
// lock.lock();
// try {
// boolean modified = false;
// for (E e : c)
// if (linkLast(e))
// modified = true;
// return modified;
// } finally {
// lock.unlock();
// }
// }

@SuppressWarnings("unchecked")
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] a = new Object[count];
int k = 0;
for (Node<E> p = first; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
lock.unlock();
}
}

@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (a.length < count)
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), count);

int k = 0;
for (Node<E> p = first; p != null; p = p.next)
a[k++] = (T)p.item;
if (a.length > k)
a[k] = null;
return a;
} finally {
lock.unlock();
}
}

public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Node<E> p = first;
if (p == null)
return "[]";

StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = p.item;
sb.append(e == this ? "(this Collection)" : e);
p = p.next;
if (p == null)
return sb.append(']').toString();
sb.append(',').append(' ');
}
} finally {
lock.unlock();
}
}

public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> f = first; f != null; ) {
f.item = null;
Node<E> n = f.next;
f.prev = null;
f.next = null;
f = n;
}
first = last = null;
count = 0;
notFull.signalAll();
} finally {
lock.unlock();
}
}

public Iterator<E> iterator() {
return new Itr();
}

public Iterator<E> descendingIterator() {
return new DescendingItr();
}

private abstract class AbstractItr implements Iterator<E> {

Node<E> next;

E nextItem;

private Node<E> lastRet;

abstract Node<E> firstNode();
abstract Node<E> nextNode(Node<E> n);

AbstractItr() {
// set to initial position
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
next = firstNode();
nextItem = (next == null) ? null : next.item;
} finally {
lock.unlock();
}
}

private Node<E> succ(Node<E> n) {
// Chains of deleted nodes ending in null or self-links
// are possible if multiple interior nodes are removed.
for (;;) {
Node<E> s = nextNode(n);
if (s == null)
return null;
else if (s.item != null)
return s;
else if (s == n)
return firstNode();
else
n = s;
}
}

void advance() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
// assert next != null;
next = succ(next);
nextItem = (next == null) ? null : next.item;
} finally {
lock.unlock();
}
}

public boolean hasNext() {
return next != null;
}

public E next() {
if (next == null)
throw new NoSuchElementException();
lastRet = next;
E x = nextItem;
advance();
return x;
}

public void remove() {
Node<E> n = lastRet;
if (n == null)
throw new IllegalStateException();
lastRet = null;
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
if (n.item != null)
unlink(n);
} finally {
lock.unlock();
}
}
}

private class Itr extends AbstractItr {
Node<E> firstNode() { return first; }
Node<E> nextNode(Node<E> n) { return n.next; }
}

private class DescendingItr extends AbstractItr {
Node<E> firstNode() { return last; }
Node<E> nextNode(Node<E> n) { return n.prev; }
}

/** A customized variant of Spliterators.IteratorSpliterator */
static final class LBDSpliterator<E> implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
final LinkedBlockingDeque<E> queue;
Node<E> current; // current node; null until initialized
int batch; // batch size for splits
boolean exhausted; // true when no more nodes
long est; // size estimate
LBDSpliterator(LinkedBlockingDeque<E> queue) {
this.queue = queue;
this.est = queue.size();
}

public long estimateSize() { return est; }

public Spliterator<E> trySplit() {
Node<E> h;
final LinkedBlockingDeque<E> q = this.queue;
int b = batch;
int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if (!exhausted &&
((h = current) != null || (h = q.first) != null) &&
h.next != null) {
Object[] a = new Object[n];
final ReentrantLock lock = q.lock;
int i = 0;
Node<E> p = current;
lock.lock();
try {
if (p != null || (p = q.first) != null) {
do {
if ((a[i] = p.item) != null)
++i;
} while ((p = p.next) != null && i < n);
}
} finally {
lock.unlock();
}
if ((current = p) == null) {
est = 0L;
exhausted = true;
}
else if ((est -= i) < 0L)
est = 0L;
if (i > 0) {
batch = i;
return Spliterators.spliterator
(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
Spliterator.CONCURRENT);
}
}
return null;
}

public void forEachRemaining(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
final LinkedBlockingDeque<E> q = this.queue;
final ReentrantLock lock = q.lock;
if (!exhausted) {
exhausted = true;
Node<E> p = current;
do {
E e = null;
lock.lock();
try {
if (p == null)
p = q.first;
while (p != null) {
e = p.item;
p = p.next;
if (e != null)
break;
}
} finally {
lock.unlock();
}
if (e != null)
action.accept(e);
} while (p != null);
}
}

public boolean tryAdvance(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
final LinkedBlockingDeque<E> q = this.queue;
final ReentrantLock lock = q.lock;
if (!exhausted) {
E e = null;
lock.lock();
try {
if (current == null)
current = q.first;
while (current != null) {
e = current.item;
current = current.next;
if (e != null)
break;
}
} finally {
lock.unlock();
}
if (current == null)
exhausted = true;
if (e != null) {
action.accept(e);
return true;
}
}
return false;
}

public int characteristics() {
return Spliterator.ORDERED | Spliterator.NONNULL |
Spliterator.CONCURRENT;
}
}

public Spliterator<E> spliterator() {
return new LBDSpliterator<E>(this);
}

private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// Write out capacity and any hidden stuff
s.defaultWriteObject();
// Write out all elements in the proper order.
for (Node<E> p = first; p != null; p = p.next)
s.writeObject(p.item);
// Use trailing null as sentinel
s.writeObject(null);
} finally {
lock.unlock();
}
}

private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
count = 0;
first = null;
last = null;
for (;;) {
@SuppressWarnings("unchecked")
E item = (E)s.readObject();
if (item == null)
break;
add(item);
}
}
}

 

Modifier and Type

Method and Description

​boolean​

​add(E​

将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制, 则在成功后返回 true,如果当前没有可用空间,则抛出IllegalStateException。

​boolean​

​addAll(Collection<? extends E> c)​

将指定集合中的所有元素添加到此队列中。

​void​

​clear()​

从此队列中删除所有元素。

​E​

​element()​

检索,但不删除,这个队列的头。

​E​

​remove()​

检索并删除此队列的头。

package java.util;

public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E> {

protected AbstractQueue() {
}

public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

public void clear() {
while (poll() != null)
;
}

public boolean addAll(Collection<? extends E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
boolean modified = false;
for (E e : c)
if (add(e))
modified = true;
return modified;
}

}

 

Modifier and Type

Method and Description

​boolean​

​add(E​

将指定的元素插入到此队列中,如果可以立即执行此操作,而不会违反容量限制, ​​true​​​在成功后返回 ​​IllegalStateException​​如果当前没有可用空间,则抛出IllegalStateException。

​E​

​element()​

检索,但不删除,这个队列的头。

​boolean​

​offer(E​

如果在不违反容量限制的情况下立即执行,则将指定的元素插入到此队列中。

​E​

​peek()​

检索但不删除此队列的头,如果此队列为空,则返回 ​​null​​ 。

​E​

​poll()​

检索并删除此队列的头,如果此队列为空,则返回 ​​null​​ 。

​E​

​remove()​

检索并删除此队列的头。

package java.util;

public interface Queue<E> extends Collection<E> {

boolean add(E e);

boolean offer(E e);

E remove();

E poll();

E element();

E peek();
}

 

Modifier and Type

Method and Description

​boolean​

​add(E​

将指定的元素插入由此deque表示的队列(换句话说,在该deque的尾部),如果可以立即执行,而不违反容量限制, ​​true​​​在成功后返回 ​​IllegalStateException​​如果当前没有可用空间,则抛出IllegalStateException 。

​void​

​addFirst(E​

插入此双端队列的前面,如果它是立即可行且不会违反容量限制,抛出一个指定的元素 ​​IllegalStateException​​如果当前没有空间可用。

​void​

​addLast(E​

在插入如果它是立即可行且不会违反容量限制,抛出此双端队列的末尾指定元素 ​​IllegalStateException​​如果当前没有空间可用。

​boolean​

​contains(Object​

如果此deque包含指定的元素,则返回 ​​true​​ 。

​E​

​element()​

检索但不删除由此deque表示的队列的头部(换句话说,该deque的第一个元素)。

​Iterator<E>​

​iterator()​

以正确的顺序返回此deque中的元素的迭代器。

​boolean​

​offer(E​

将指定的元素插入由此deque表示的队列(换句话说,在该deque的尾部),如果可以立即执行,而不违反容量限制, ​​true​​​在成功时 ​​false​​如果当前没有可用空间,则返回false。

​boolean​

​offer(E e, long timeout, TimeUnit​

将指定的元素插入由此deque表示的队列中(换句话说,在该deque的尾部),等待指定的等待时间(如果需要空间可用)。

​boolean​

​offerFirst(E​

插入此双端队列的前面,如果它是立即可行且不会违反容量限制,返回指定的元素 ​​true​​​在成功和 ​​false​​ ,如果当前没有空间可用。

​boolean​

​offerFirst(E e, long timeout, TimeUnit​

在此deque的前面插入指定的元素,等待指定的等待时间(如果需要空间可用)。

​boolean​

​offerLast(E​

插入此双端队列的末尾,如果它是立即可行且不会违反容量限制,返回指定的元素 ​​true​​​在成功和 ​​false​​ ,如果当前没有空间可用。

​boolean​

​offerLast(E e, long timeout, TimeUnit​

在此deque的末尾插入指定的元素,如果需要空间可用,等待指定的等待时间。

​E​

​peek()​

检索但不删除由此deque表示的队列的头部(换句话说,该deque的第一个元素),如果此deque为空,则返回 ​​null​​ 。

​E​

​poll()​

检索并删除由此deque(换句话说,此deque的第一个元素)表示的队列的 ​​null​​​如果此deque为空,则返回 ​​null​​ 。

​E​

​poll(long timeout, TimeUnit​

检索并删除由此deque(换句话说,该deque的第一个元素)表示的队列的头部,等待到指定的等待时间(如有必要)使元素变为可用。

​E​

​pollFirst(long timeout, TimeUnit​

检索并删除此deque的第一个元素,等待指定的等待时间(如有必要),使元素变为可用。

​E​

​pollLast(long timeout, TimeUnit​

检索并删除此deque的最后一个元素,等待到指定的等待时间,如果需要,元素可用。

​void​

​push(E​

将元素推送到由此deque表示的堆栈(换句话说,在该deque的头部),如果可以立即执行,而不违反容量限制,则抛出 ​​IllegalStateException​​如果当前没有可用空间)。

​void​

​put(E​

将指定的元素插入由此deque表示的队列(换句话说,在该deque的尾部),等待空格变为可用时。

​void​

​putFirst(E​

在此deque的前面插入指定的元素,如有必要,等待空格变为可用。

​void​

​putLast(E​

在此deque的末尾插入指定的元素,如有必要,等待空格变为可用。

​E​

​remove()​

检索并删除由此deque表示的队列的头(换句话说,该deque的第一个元素)。

​boolean​

​remove(Object​

从此deque中删除指定元素的第一个出现。

​boolean​

​removeFirstOccurrence(Object​

从此deque中删除指定元素的第一个出现。

​boolean​

​removeLastOccurrence(Object​

从此deque中删除指定元素的最后一次出现。

​int​

​size()​

返回此deque中的元素数。

​E​

​take()​

检索并删除由此deque(换句话说,该deque的第一个元素)表示的队列的头部,如果需要,等待,直到元素可用。

​E​

​takeFirst()​

检索并删除此deque的第一个元素,如有必要等待,直到元素可用。

​E​

​takeLast()​

检索并删除此deque的最后一个元素,如有必要等待,直到元素可用。

package java.util.concurrent;
import java.util.*;

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {

void addFirst(E e);

void addLast(E e);

boolean offerFirst(E e);

boolean offerLast(E e);

void putFirst(E e) throws InterruptedException;

void putLast(E e) throws InterruptedException;

boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException;

boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException;

E takeFirst() throws InterruptedException;

E takeLast() throws InterruptedException;

E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException;

E pollLast(long timeout, TimeUnit unit)
throws InterruptedException;

boolean removeFirstOccurrence(Object o);

boolean removeLastOccurrence(Object o);

boolean add(E e);

boolean offer(E e);

void put(E e) throws InterruptedException;

boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

E remove();

E poll();

E take() throws InterruptedException;

E poll(long timeout, TimeUnit unit)
throws InterruptedException;

E element();

E peek();

boolean remove(Object o);

public boolean contains(Object o);

public int size();

Iterator<E> iterator();

void push(E e);
}

 

Modifier and Type

Method and Description

​boolean​

​add(E​

将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制, ​​true​​​在成功后返回 ​​IllegalStateException​​如果当前没有可用空间,则抛出IllegalStateException。

​boolean​

​contains(Object​

如果此队列包含指定的元素,则返回 ​​true​​ 。

​int​

​drainTo(Collection<? super E> c)​

从该队列中删除所有可用的元素,并将它们添加到给定的集合中。

​int​

​drainTo(Collection<? super E> c, int maxElements)​

最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中。

​boolean​

​offer(E​

将指定的元素插入到此队列中,如果可以立即执行此操作,而不会违反容量限制, ​​true​​​在成功时 ​​false​​如果当前没有可用空间,则返回false。

​boolean​

​offer(E e, long timeout, TimeUnit​

将指定的元素插入到此队列中,等待指定的等待时间(如有必要)才能使空间变得可用。

​E​

​poll(long timeout, TimeUnit​

检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。

​void​

​put(E​

将指定的元素插入到此队列中,等待空格可用。

​int​

​remainingCapacity()​

返回该队列最好可以(在没有存储器或资源约束)接受而不会阻塞,或附加的元素的数量 ​​Integer.MAX_VALUE​​如果没有固有的限制。

​boolean​

​remove(Object​

从该队列中删除指定元素的单个实例(如果存在)。

​E​

​take()​

检索并删除此队列的头,如有必要,等待元素可用。

package java.util.concurrent;

import java.util.Collection;
import java.util.Queue;


public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);

boolean offer(E e);

void put(E e) throws InterruptedException;

boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

E take() throws InterruptedException;

E poll(long timeout, TimeUnit unit)
throws InterruptedException;

int remainingCapacity();

boolean remove(Object o);

public boolean contains(Object o);

int drainTo(Collection<? super E> c);

int drainTo(Collection<? super E> c, int maxElements);
}

 

Modifier and Type

Method and Description

​boolean​

​add(E​

将指定的元素插入此双端队列表示的队列中(换句话说,在此双端队列的尾部),如果它是立即可行且不会违反容量限制,返回 ​​true​​​在成功时和抛出 ​​IllegalStateException​​如果当前没有空间可用的。

​void​

​addFirst(E​

插入此双端队列的前面,如果它是立即可行且不会违反容量限制,抛出一个指定的元素 ​​IllegalStateException​​如果当前没有空间可用。

​void​

​addLast(E​

在插入如果它是立即可行且不会违反容量限制,抛出此双端队列的末尾指定元素 ​​IllegalStateException​​如果当前没有空间可用。

​boolean​

​contains(Object​

如果此deque包含指定的元素,则返回 ​​true​​ 。

​Iterator<E>​

​descendingIterator()​

以相反的顺序返回此deque中的元素的迭代器。

​E​

​element()​

检索但不删除由此deque表示的队列的头部(换句话说,该deque的第一个元素)。

​E​

​getFirst()​

检索,但不删除,这个deque的第一个元素。

​E​

​getLast()​

检索,但不删除,这个deque的最后一个元素。

​Iterator<E>​

​iterator()​

以正确的顺序返回此deque中的元素的迭代器。

​boolean​

​offer(E​

将指定的元素插入由此deque表示的队列(换句话说,在该deque的尾部),如果可以立即执行,而不违反容量限制, ​​true​​​在成功时 ​​false​​如果当前没有可用空间,则返回false。

​boolean​

​offerFirst(E​

在此deque的前面插入指定的元素,除非它会违反容量限制。

​boolean​

​offerLast(E​

在此deque的末尾插入指定的元素,除非它会违反容量限制。

​E​

​peek()​

检索但不删除由此deque表示的队列的头部(换句话说,此deque的第一个元素),如果此deque为空,则返回 ​​null​​ 。

​E​

​peekFirst()​

检索,但不删除,此deque的第一个元素,或返回 ​​null​​如果这个deque是空的。

​E​

​peekLast()​

检索但不删除此deque的最后一个元素,如果此deque为空,则返回 ​​null​​ 。

​E​

​poll()​

检索并删除由此deque(换句话说,此deque的第一个元素)表示的队列的 ​​null​​​如果此deque为空,则返回 ​​null​​ 。

​E​

​pollFirst()​

检索并删除此deque的第一个元素,如果此deque为空,则返回 ​​null​​ 。

​E​

​pollLast()​

检索并删除此deque的最后一个元素,如果此deque为空,则返回 ​​null​​ 。

​E​

​pop()​

从这个deque表示的堆栈中弹出一个元素。

​void​

​push(E​

将元素推送到由此deque表示的堆栈(换句话说,在此deque的头部),如果可以立即执行此操作而不违反容量限制,则抛出 ​​IllegalStateException​​如果当前没有可用空间)。

​E​

​remove()​

检索并删除由此deque表示的队列的头(换句话说,该deque的第一个元素)。

​boolean​

​remove(Object​

从此deque中删除指定元素的第一个出现。

​E​

​removeFirst()​

检索并删除此deque的第一个元素。

​boolean​

​removeFirstOccurrence(Object​

从此deque中删除指定元素的第一个出现。

​E​

​removeLast()​

检索并删除此deque的最后一个元素。

​boolean​

​removeLastOccurrence(Object​

从此deque中删除指定元素的最后一次出现。

​int​

​size()​

返回此deque中的元素数。

package java.util;


public interface Deque<E> extends Queue<E> {

void addFirst(E e);

void addLast(E e);

boolean offerFirst(E e);

boolean offerLast(E e);

E removeFirst();

E removeLast();

E pollFirst();

E pollLast();

E getFirst();

E getLast();

E peekFirst();

E peekLast();

boolean removeFirstOccurrence(Object o);

boolean removeLastOccurrence(Object o);

// *** Queue methods ***
boolean add(E e);

boolean offer(E e);

E remove();

E poll();

E element();

E peek();


// *** Stack methods ***
void push(E e);

E pop();


// *** Collection methods ***
boolean remove(Object o);

boolean contains(Object o);

public int size();

Iterator<E> iterator();

Iterator<E> descendingIterator();
}

 

 

 

 

 

 

 

 

 

 

 

 

举报

相关推荐

0 条评论