DelayQueue是一个阻塞队列,其实现了BlockingQueue接口。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
}
添加到延迟队列中的元素必须实现Delayed接口,该接口有2个方法:
// 获取元素的延误时间
long getDelay(TimeUnit unit);
// 元素比较
public int compareTo(T o);
- 为了将最早过期的元素放置在队列头部,DelayQueue基于PriorityQueue优先队列来维护队列中的元素;
- DelayQueue基于ReentrantLock的Condition实现了阻塞。
入队方法
offer方法将元素插入到队列中,并返回是否成功插入。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 执行优先队列的offer方法,将元素插入
q.offer(e);
// 如果优先队列的首元素为该元素
if (q.peek() == e) {
// 将leader置为null,代表可以获取该元素
leader = null;
// 通知工作线程来获取该元素
available.signal();
}
// 返回true
return true;
} finally {
lock.unlock();
}
}
put方法为阻塞队列(BlockingQueue)的方法,其直接调用的offer方法:
public void put(E e) {
offer(e);
}
offer(E e, long timeout, TimeUnit unit)也属于阻塞队列(BlockingQueue)的方法,入参需要额外指定超时时间timeout,但仍然直接调用的offer方法,timeout未发挥作用。
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
由于底层存储元素的PriorityQueue优先队列相当于是"无界"队列(最多Integer.MAX_VALUE-8个元素),所以添加元素的阻塞与非阻塞实现是一致的。
出队方法
poll()为非阻塞方法,若首元素的延迟时间未到,则直接返回null:
public E poll() {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取优先队列的首元素
E first = q.peek();
// 如果首元素为null或者首元素的延迟时间>0,则直接返回null
// 如果首元素不为null,且首元素的延迟时间<=0,则执行q.poll(),返回优先队列的首元素
return (first == null || first.getDelay(NANOSECONDS) > 0)
? null
: q.poll();
} finally {
// 释放锁
lock.unlock();
}
}
而poll(long timeout, TimeUnit unit)则是阻塞方法,timeout为超时时间,其会在timeout时间内反复尝试获取优先队列的首元素。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
// 若优先队列首元素为null
if (first == null) {
// 超时时间nanos<=0,代表已超时,直接返回null
if (nanos <= 0L)
return null;
// 否则阻塞等待nanos纳秒
else
nanos = available.awaitNanos(nanos);
// 若预先队列首元素不为null
} else {
// 获取首元素的延迟时间
long delay = first.getDelay(NANOSECONDS);
// 如果延迟时间已到,代表任务已到期,则不管timeout是否已经<=0,均直接执行q.poll()
if (delay <= 0L)
return q.poll();
// 任务未到期,但timeout已经<=0,直接返回null
if (nanos <= 0L)
return null;
first = null; // don't retain ref while waiting
// 若nanos<delay或者leader不为null,阻塞等待nanos纳秒
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
// 阻塞唤醒之后,继续进入到for(;;)循环,此时nanos<=0,然后会返回null
else {
// 若nanos>=delay且leader为null
// 则将当前线程设置为leader,并阻塞等待delay纳秒
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
// 被唤醒之后,更新nanos为nanos-(delay - timeLeft),继续执行新一轮的for(;;)
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader已置为null且优先队列q中首元素不为null,则唤醒等待的线程,并释放锁
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
take()是阻塞方法,若首元素的延迟时间未到,则持续阻塞:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 添加可中断锁
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
// 若优先队列为空,代表无可消费的元素,则消费线程执行awit()方法阻塞等待
if (first == null)
available.await();
// 若优先队列不为空
else {
// 获取优先队列首元素的过期时间
long delay = first.getDelay(NANOSECONDS);
// 若已过期,则直接返回优先队列的首元素
if (delay <= 0L)
return q.poll();
first = null; // don't retain ref while waiting
// 若leader不为空,则证明已有线程在拉取首元素,当前线程阻塞
if (leader != null)
available.await();
else {
// 若leader不为空,则将leader设置为自身线程,然后await delay
// 延迟时间到达之后,会重新进入到for(;;) 此时获取到first的delay<=0,执行q.poll()返回首元素
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader已置为null且优先队列q中首元素不为null,则唤醒等待的线程,并释放锁
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
可以看到。take()阻塞方法是若队列为空,则持续await阻塞挂起,直到被队列中添加元素的线程signal唤醒;