一、DelayQueue 的核心原理
Delayed 接口:队列元素需实现此接口,提供两个方法:
long getDelay(TimeUnit unit):返回元素剩余的延迟时间(单位由 unit 指定)。
int compareTo(Delayed o):用于队列内部排序(按延迟时间升序,确保最早到期的元素优先被取出)。
阻塞特性:
当调用 take() 方法时,若队列中没有到期的元素,线程会阻塞等待,直到有元素到期。
当调用 poll() 方法时,若没有到期的元素,会直接返回 null(非阻塞)。
二、DelayQueue 的使用步骤
- 定义延迟元素类:实现 Delayed 接口,重写 getDelay() 和 compareTo() 方法。
- 创建 DelayQueue 实例:用于存储延迟元素。
- 添加元素到队列:通过 add() 或 offer() 方法入队。
- 取出到期元素:通过 take()(阻塞)或 poll()(非阻塞)方法出队。
三、代码实例:实现一个定时任务调度器
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延迟任务元素:实现Delayed接口
*/
class DelayTask implements Delayed {
// 任务名称
private String taskName;
// 任务执行时间(毫秒时间戳)
private long executeTime;
// 任务逻辑
private Runnable task;
public DelayTask(String taskName, long delay, TimeUnit unit, Runnable task) {
this.taskName = taskName;
// 计算任务执行时间:当前时间 + 延迟时间
this.executeTime = System.currentTimeMillis() + unit.toMillis(delay);
this.task = task;
}
/**
* 返回剩余延迟时间
*/
@Override
public long getDelay(TimeUnit unit) {
// 剩余时间 = 执行时间 - 当前时间
long remaining = executeTime - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}
/**
* 按执行时间排序(延迟时间短的优先)
*/
@Override
public int compareTo(Delayed o) {
DelayTask other = (DelayTask) o;
return Long.compare(this.executeTime, other.executeTime);
}
// 执行任务
public void run() {
System.out.println("执行任务:" + taskName + ",当前时间:" + System.currentTimeMillis());
task.run(); // 执行实际任务逻辑
}
}
/**
* 定时任务调度器:使用DelayQueue管理延迟任务
*/
class TaskScheduler {
// 延迟队列存储任务
private final DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
// 工作线程:负责取出到期任务并执行
private final Thread worker;
public TaskScheduler() {
worker = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
// 阻塞等待,直到有任务到期
DelayTask task = delayQueue.take();
// 执行到期任务
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
break;
}
}
});
worker.start(); // 启动工作线程
}
// 添加延迟任务
public void schedule(String taskName, long delay, TimeUnit unit, Runnable task) {
delayQueue.add(new DelayTask(taskName, delay, unit, task));
System.out.println("添加任务:" + taskName + ",延迟时间:" + delay + unit);
}
// 关闭调度器
public void shutdown() {
worker.interrupt();
}
}
/**
* 测试类
*/
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
TaskScheduler scheduler = new TaskScheduler();
// 添加3个延迟任务
scheduler.schedule("任务1", 1, TimeUnit.SECONDS, () ->
System.out.println("任务1:发送验证码")
);
scheduler.schedule("任务2", 3, TimeUnit.SECONDS, () ->
System.out.println("任务2:关闭订单")
);
scheduler.schedule("任务3", 2, TimeUnit.SECONDS, () ->
System.out.println("任务3:发送推送通知")
);
// 等待所有任务执行完成(这里简化处理,实际可根据业务调整)
Thread.sleep(5000);
scheduler.shutdown();
}
}
四、代码解析
DelayTask 类:
实现 Delayed 接口,存储任务名称、执行时间、实际任务逻辑(Runnable)。
getDelay() 计算剩余延迟时间(执行时间 - 当前时间)。
compareTo() 确保队列按执行时间升序排序,保证最早到期的任务先被执行。
TaskScheduler 类:
内部维护 DelayQueue 存储延迟任务。
启动一个工作线程,通过 take() 阻塞等待到期任务,取出后执行。
提供 schedule() 方法添加任务,shutdown() 方法停止调度器。
五、DelayQueue 的适用场景
定时任务调度:如延迟执行的任务(订单超时关闭、定时提醒)。
缓存过期清理:存储缓存元素,到期后自动移除。
重试机制:失败任务延迟重试(如接口调用失败后,10 秒后重试)。
限流控制:控制请求频率(如令牌桶算法中,定时添加令牌)。