实现思路
引入Jedis
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
指令简介
zadd
zrem
zrangeByScore
Java实现Redis延时队列
import redis.clients.jedis.Jedis;
import java.util.Set;
public class DelayQueueWithRedis {
private Jedis jedis;
private String queueKey;
public DelayQueueWithRedis(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void push(String message, long delaySeconds) {
long score = System.currentTimeMillis() / 1000 + delaySeconds;
jedis.zadd(queueKey, score, message);
}
public String pop() {
while (true) {
long now = System.currentTimeMillis() / 1000;
Set<String> messages = jedis.zrangeByScore(queueKey, 0, now, 0, 1);
if (messages.isEmpty()) {
System.out.println("No messages");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
} else {
String message = messages.iterator().next();
jedis.zrem(queueKey, message);
return message;
}
}
return null;
}
}
import redis.clients.jedis.Jedis;
public class MainP {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost",6379);
DelayQueueWithRedis delayQueue = new DelayQueueWithRedis(jedis, "delay_queue");
delayQueue.push("message1", 5);
delayQueue.push("message2", 10);
delayQueue.push("message3", 8);
}
}
import redis.clients.jedis.Jedis;
public class MainC {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost",6379);
DelayQueueWithRedis delayQueue = new DelayQueueWithRedis(jedis, "delay_queue");
while (true) {
String message = delayQueue.pop();
if (message != null) {
System.out.println("Consumed: " + message);
}
}
}
}