1 背景
Kafka存在大量的延时操作,比如延时生产、延时消费或者延时删除,实现延时操作有很多办法,JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1),所以kafka并没有像jdk的Timer或者DelayQueue那样来实现延时的功能,而是基于时间轮实现延时。
2 时间轮
2.1 什么是时间轮
对比上图的⌚️,秒针在0~59秒都会落一次,假如任务的延时时间是0——59,那秒针落在哪个时刻,哪个时刻对应的任务执行。
时间轮的本质是一个环形数组,当指针每走一步,就获取到当前刻度挂载的任务进行执行
2.2 最简单的版本实现
class Task {
}
@Data
class Bucket {
@Delegate
private List<Task> taskList = new ArrayList<>();
}
class Timer {
private Bucket[] bucketList = new Bucket[60];
public Timer (){
Arrays.fill(bucketList, new Bucket());
}
private int getBucketByTimestamp(long timestamp){
return (int)(timestamp / 1000) % 60;
}
public boolean addTask(Task task, long timestamp) {
int bucket = this.getBucketByTimestamp(timestamp);
bucketList[bucket].add(task);
return true;
}
public Bucket getBucket(long timestamp){
int bucket = this.getBucketByTimestamp(timestamp);
return bucketList[bucket];
}
public void run() throws InterruptedException {
while (true) {
Bucket bucket = this.getBucket(System.currentTimeMillis());
List<Task> taskList = bucket.getTaskList();
if (taskList.size() != 0) {
// 线程池do task
}
TimeUnit.SECONDS.sleep(1);
}
}
}
3 长时间的任务
上面的实现表示的范围有限(只能表示60s之内的定时任务),超过的就没办法表示了,解决办法有两个
3.1 记录圈数
在记录任务的时候,同时记录下任务的圈数,当圈数为0时,代表需要执行任务,否则圈数-1。这种实现方式存在缺点,每一个刻度上的任务非常多,导致效率降低,和HashMap的hash冲突类似。
@Data
@AllArgsConstructor
@NoArgsConstructor
class Task {
private int num = 0; // 圈数
}
@Data
class Bucket {
@Delegate
private List<Task> taskList = new ArrayList<>();
}
class Timer {
private Bucket[] bucketList = new Bucket[60];
public Timer (){
Arrays.fill(bucketList, new Bucket());
}
private int getBucketByTimestamp(long timestamp){
return (int)(timestamp / 1000) % 60;
}
public boolean addTask(Task task, long timestamp) {
int bucket = this.getBucketByTimestamp(timestamp);
bucketList[bucket].add(task);
return true;
}
public List<Task> getTaskList(long timestamp){
int bucket = this.getBucketByTimestamp(timestamp);
bucketList[bucket].getTaskList().stream().filter(it -> it.getNum() != 0).forEach(it -> it.setNum(it.getNum() - 1));
return bucketList[bucket].getTaskList().stream().filter(it -> it.getNum() == 0).collect(Collectors.toList());
}
public void run() throws InterruptedException {
while (true) {
List<Task> taskList = this.getTaskList(System.currentTimeMillis());
if (taskList.size() != 0) {
// 线程池do task
}
TimeUnit.SECONDS.sleep(1);
}
}
}
3.2 层级时间轮
对于只有三个指针的表来说,最大能表示12个小时,超过了12小时这个范围,时间就会产生歧义。如果我们加多几个指针呢?比如说我们有秒针,分针,时针,上下午针,天针,月针,年针。。。而且,它并不需要占用很大的内存。
对应到层级时间轮,当超过当前时间轮表示范围后,尝试添加到上层的时间轮(不设上限)。比如有一个600s后执行的任务,秒级时间轮不能处理,需要尝试添加到分钟级别;后续如果任务剩余时间小于60s,对任务进行降级,将任务添加到秒级时间轮,最后保证所有任务都能执行。
@Data
@AllArgsConstructor
@NoArgsConstructor
class Task {
private int num = 0; // 圈数
}
@Data
class Bucket {
@Delegate
private List<Task> taskList = new ArrayList<>();
}
@Builder
class Timer {
// 一个bucket的大小
private long tickMs;
// 有多少bucket
private int wheelSize;
// 上一级时间轮
private Timer upTimer;
// 时间轮的时间跨度
private long interval;
private Bucket[] bucketList = new Bucket[60];
public Timer (){
Arrays.fill(bucketList, new Bucket());
}
private int getBucketByTimestamp(long timestamp){
return (int)(timestamp / 1000) % 60;
}
private synchronized Timer getUpTimer(){
if (Objects.isNull(this.upTimer)) {
this.upTimer = Timer.builder()
.interval(this.interval * 60)
.tickMs(this.tickMs * 60)
.wheelSize(60)
.build();
}
return this.upTimer;
}
private void advance(){
if (Objects.isNull(this.upTimer)) {
return ;
}
// 推动upTimer状态,将可以降级的bucket进行降级处理。
}
public boolean addTask(Task task, long timestamp) {
long delayMs = timestamp - System.currentTimeMillis();
if (delayMs < tickMs) {
// 过期了,可以抛给上层处理
return false;
} else {
// 没过期,扔进当前时间轮的某个槽中
if (delayMs < interval) {
int bucketIndex = (int) (((delayMs + System.currentTimeMillis()) / tickMs) % wheelSize);
Bucket bucket = bucketList[bucketIndex];
bucket.add(task);
} else {
// 扔到上一级
Timer timeWheel = this.getUpTimer();
timeWheel.addTask(task, timestamp);
}
}
return true;
}
public List<Task> getTaskList(long timestamp){
int bucket = this.getBucketByTimestamp(timestamp);
bucketList[bucket].getTaskList().stream().filter(it -> it.getNum() != 0).forEach(it -> it.setNum(it.getNum() - 1));
return bucketList[bucket].getTaskList().stream().filter(it -> it.getNum() == 0).collect(Collectors.toList());
}
public void run() throws InterruptedException {
while (true) {
List<Task> taskList = this.getTaskList(System.currentTimeMillis());
if (taskList.size() != 0) {
// 线程池do task
}
// 执行后需要推动一下状态,相当于表的电池
this.advance();
TimeUnit.SECONDS.sleep(1);
}
}
}
3.3 如何判断任务已过期
这个过期有可能说的是不符合当前时间轮,需要重新分配。可以使用delayedQueue(之前说delayedQueue复杂度过高是对任务来说,我们可以针对bucket来说,不管任务有多少个,bucket不会变),而这个槽到期后,也就是被我们从delayQueue中poll出来后,我们只需要按照添加新任务的逻辑,将槽中的所有任务循环一次,重新加到新的槽中即可。