0
点赞
收藏
分享

微信扫一扫

DelayQueue源码分析

松鼠树屋 2022-01-18 阅读 38
java

DelayQueue是一个阻塞队列,其实现了BlockingQueue接口。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
	
}

添加到延迟队列中的元素必须实现Delayed接口,该接口有2个方法:

// 获取元素的延误时间
long getDelay(TimeUnit unit);

// 元素比较
public int compareTo(T o);
  • 为了将最早过期的元素放置在队列头部,DelayQueue基于PriorityQueue优先队列来维护队列中的元素;
  • DelayQueue基于ReentrantLock的Condition实现了阻塞。

入队方法

offer方法将元素插入到队列中,并返回是否成功插入。

public boolean offer(E e) {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		// 执行优先队列的offer方法,将元素插入
		q.offer(e);
		// 如果优先队列的首元素为该元素
		if (q.peek() == e) {
			// 将leader置为null,代表可以获取该元素
			leader = null;
			// 通知工作线程来获取该元素
			available.signal();
		}
		// 返回true
		return true;
	} finally {
		lock.unlock();
	}
}

put方法为阻塞队列(BlockingQueue)的方法,其直接调用的offer方法:

public void put(E e) {
	offer(e);
}

offer(E e, long timeout, TimeUnit unit)也属于阻塞队列(BlockingQueue)的方法,入参需要额外指定超时时间timeout,但仍然直接调用的offer方法,timeout未发挥作用。

public boolean offer(E e, long timeout, TimeUnit unit) {
	return offer(e);
}

由于底层存储元素的PriorityQueue优先队列相当于是"无界"队列(最多Integer.MAX_VALUE-8个元素),所以添加元素的阻塞与非阻塞实现是一致的。

出队方法

poll()为非阻塞方法,若首元素的延迟时间未到,则直接返回null:

public E poll() {
	final ReentrantLock lock = this.lock;
	// 加锁
	lock.lock();
	try {
		// 获取优先队列的首元素
		E first = q.peek();
		// 如果首元素为null或者首元素的延迟时间>0,则直接返回null
		// 如果首元素不为null,且首元素的延迟时间<=0,则执行q.poll(),返回优先队列的首元素
		return (first == null || first.getDelay(NANOSECONDS) > 0)
			? null
			: q.poll();
	} finally {
		// 释放锁
		lock.unlock();
	}
}

而poll(long timeout, TimeUnit unit)则是阻塞方法,timeout为超时时间,其会在timeout时间内反复尝试获取优先队列的首元素。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
	long nanos = unit.toNanos(timeout);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		for (;;) {
			E first = q.peek();
			// 若优先队列首元素为null
			if (first == null) {
				// 超时时间nanos<=0,代表已超时,直接返回null
				if (nanos <= 0L)
					return null;
				// 否则阻塞等待nanos纳秒
				else
					nanos = available.awaitNanos(nanos);
			// 若预先队列首元素不为null
			} else {
				// 获取首元素的延迟时间
				long delay = first.getDelay(NANOSECONDS);
				// 如果延迟时间已到,代表任务已到期,则不管timeout是否已经<=0,均直接执行q.poll()
				if (delay <= 0L)
					return q.poll();
				// 任务未到期,但timeout已经<=0,直接返回null
				if (nanos <= 0L)
					return null;
				first = null; // don't retain ref while waiting
				// 若nanos<delay或者leader不为null,阻塞等待nanos纳秒
				if (nanos < delay || leader != null)
					nanos = available.awaitNanos(nanos);
					// 阻塞唤醒之后,继续进入到for(;;)循环,此时nanos<=0,然后会返回null
				else {
				// 若nanos>=delay且leader为null
				// 则将当前线程设置为leader,并阻塞等待delay纳秒
					Thread thisThread = Thread.currentThread();
					leader = thisThread;
					try {
						long timeLeft = available.awaitNanos(delay);
						// 被唤醒之后,更新nanos为nanos-(delay - timeLeft),继续执行新一轮的for(;;)
						nanos -= delay - timeLeft;
					} finally {
						if (leader == thisThread)
							leader = null;
					}
				}
			}
		}
	} finally {
		// 如果leader已置为null且优先队列q中首元素不为null,则唤醒等待的线程,并释放锁
		if (leader == null && q.peek() != null)
			available.signal();
		lock.unlock();
	}
}

take()是阻塞方法,若首元素的延迟时间未到,则持续阻塞:

public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	// 添加可中断锁
	lock.lockInterruptibly();
	try {
		for (;;) {
			E first = q.peek();
			// 若优先队列为空,代表无可消费的元素,则消费线程执行awit()方法阻塞等待
			if (first == null)
				available.await();
			// 若优先队列不为空
			else {
				// 获取优先队列首元素的过期时间
				long delay = first.getDelay(NANOSECONDS);
				// 若已过期,则直接返回优先队列的首元素
				if (delay <= 0L)
					return q.poll();
				first = null; // don't retain ref while waiting
				// 若leader不为空,则证明已有线程在拉取首元素,当前线程阻塞
				if (leader != null)
					available.await();
				else {
				// 若leader不为空,则将leader设置为自身线程,然后await delay
				// 延迟时间到达之后,会重新进入到for(;;) 此时获取到first的delay<=0,执行q.poll()返回首元素
					Thread thisThread = Thread.currentThread();
					leader = thisThread;
					try {
						available.awaitNanos(delay);
					} finally {
						if (leader == thisThread)
							leader = null;
					}
				}
			}
		}
	} finally {
		// 如果leader已置为null且优先队列q中首元素不为null,则唤醒等待的线程,并释放锁
		if (leader == null && q.peek() != null)
			available.signal();
		lock.unlock();
	}
}

可以看到。take()阻塞方法是若队列为空,则持续await阻塞挂起,直到被队列中添加元素的线程signal唤醒;

举报

相关推荐

0 条评论