纯纯自己一遍遍 debug 的源码,限于文章篇幅,只保留关键节点源码,持续更新
看门狗由来
先来抛出一个问题:锁因业务代码执行时间过长导致被提前释放,出现这种情况你会怎么去解决?
解决方案
- 将锁设置为永不过期,会造成死锁问题,且这把死锁只能手动运维去 redis 中删除
- 使用 redis 看门狗机制,开启一个定时任务给锁续命(锁默认 30s 过期,每隔 10s刷新过期时间至 30s)优点在于,锁被设置了过期时间,当出现死锁问题,重启服务器可以关闭定时任务,不至于一直死锁
watchDog 底层用到的 Lua 脚本,没有用 setnx,而是用 hash 结构实现的。至于看门狗续命的条件,只针对于没有设置过期时间的方法,例如下面这些方法而言。
RLock lock = redisson.getLock("watch_key");
lock.lock(); //开启看门狗
lock.tryLock(3, TimeUnit.SECONDS); //开启看门狗
而针对于设置了过期时间的方法,这些方法不开启看门狗
userLock.lock(18, TimeUnit.SECONDS); //到期自动释放
userLock.tryLock(0, 3, TimeUnit.SECONDS);//到期自动释放
开启看门狗 timeOut(触发时机)
里面涉及到Netty、Promise、 lua 脚本!!!!
先来说结论:其加锁是通过异步执行Lua 脚本实现的,然后设置一个监听器监听异步返回的结果,因为Redisson 底层通过Netty去操纵Redis的,当channelRead收到 lua 脚本的执行结果后,会触发promise.trySuccess(null)方法(修改promise 的状态), promise的状态变化了,于是会触发监听器也就是执行onComplete里面的逻辑,最终实时的去启动一个看门狗去续命。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//执行加锁 lua 脚本
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
//锁剩余时间为 null,说明加锁成功
if (ttlRemaining == null) {
//开启看门狗
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
由于结合了Netty 操作 redis,以及 promise 机制,在channelRead中不可能每收到一次redis 响应,就重复的去开启看门狗定时器,这对于系统资源来说都是灾难,因此onComplete里面加了一个判断只有是加锁成功 (ttlRemaining==null)的请求,才会开启看门狗续命,另外如果线程 a 加锁后又释放锁了,线程 b 接着加锁,线程 b 也会重复的开启看门狗定时器去续命。由于线程a 解锁操作里面将自己的看门狗释放了,不会造成内存问题。
另外 redisson 对 redis 的操作是包装成了一个RedisExecutor进行执行的。先是获取 redis 连接,然后发送 lua 脚本执行。其他逻辑没有细看
public void execute() {
if (mainPromise.isCancelled()) {
free();
return;
}
//redis 连接关闭
if (!connectionManager.getShutdownLatch().acquire()) {
free();
//监听器发布取消事件
mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
return;
}
codec = getCodec(codec);
RFuture<RedisConnection> connectionFuture = getConnection();
RPromise<R> attemptPromise = new RedissonPromise<R>();
mainPromiseListener = (r, e) -> {
if (mainPromise.isCancelled() && connectionFuture.cancel(false)) {
log.debug("Connection obtaining canceled for {}", command);
//看门狗注销
timeout.cancel();
//监听器修改状态为取消
if (attemptPromise.cancel(false)) {
free();
}
}
};
if (attempt == 0) {
mainPromise.onComplete((r, e) -> {
if (this.mainPromiseListener != null) {
this.mainPromiseListener.accept(r, e);
}
});
}
//redis 连接重试
scheduleRetryTimeout(connectionFuture, attemptPromise);
//监听获取 redis 连接是否成功
connectionFuture.onComplete((connection, e) -> {
if (connectionFuture.isCancelled()) {
connectionManager.getShutdownLatch().release();
return;
}
//redis 连接不成功释放资源,直接返回
if (!connectionFuture.isSuccess()) {
connectionManager.getShutdownLatch().release();
exception = convertException(connectionFuture);
return;
}
//连接成功发送 lua 脚本到 redis 执行
sendCommand(attemptPromise, connection);
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(writeFuture, attemptPromise, connection);
}
});
//释放 redis 连接
releaseConnection(attemptPromise, connectionFuture);
});
//监听器检查,释放 redis 连接的执行结果
attemptPromise.onComplete((r, e) -> {
checkAttemptPromise(attemptPromise, connectionFuture);
});
}
加锁lua 脚本
直接贴源码中的Lua 脚本,主要是利用 redis 中的 hash 结构实现的。主要分俩种情况
- 线程第一次加锁(设置hash 键值对)
- 同一线程多次加锁(对 value 值累加)
因此Redisson 除了是一把互斥锁外还是一把可重入锁,此外 lua 脚本中的 redis.call 意思就是运行指定 redis 命令,return nil 代表返回默认值为 null。
//判断锁的键值对是否存在
if (redis.call('exists', KEYS[1]) == 0) then
//不存在,将锁写入 redis 并且设置过期时间
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end ;
//同一个线程第一次加锁成功,第二次加锁的时候会走到这,发现 hash 里面已经有锁的记录了
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
//记录锁的重入次数加 1,并且重新设置锁的过期时间
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end ;
//返回锁的过期时间
return redis.call('pttl', KEYS[1]);
解锁lua脚本
- 判断锁是否存在,存在无需解锁了
- 消除锁重入次数
- 直到重入次数全部被消除,删除锁
因此我们在使用多次 lock 的时候要配套多次使用 unlock,不然造成锁删不掉!!!!
-- 判断锁是否存在,存在无需解锁了
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end ;
-- 消除锁重入次数
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 重入次数大于 1,设置过期时间
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
-- 直到重入次数全部被消除,删除锁
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end ;
return nil;
取消看门狗 timeOut
unlockInnerAsync里面会调用解锁的 lua 脚本,同时返回的RFuture是一个监听器,用来监听lua 脚本执行结果,当 lua 执行成功,触发 cancelExpirationRenewal方法,里面会获取到对应的Timeout,取消看门狗定时器
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
//调用解锁 lua 脚本
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
//取消看门狗 timeOut 定时器
cancelExpirationRenewal(threadId);
if (e != null) {
//发布失败事件
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
//发布执行成功事件
result.trySuccess(null);
});
return result;
}
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
//取消看门狗
timeout.cancel();
}
//同时从 EXPIRATION_RENEWAL_MAP 中移除
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
到此Redisson的加锁和解锁流程都梳理好了。下面来讲讲Redisonn 源码中大量用到的Promise、timeOut 怎么进行使用的
io.netty.util.concurrent.Promise 监听器使用
代码如下,当Promise状态改变的时候,会触发onComplete中的方法。
public class main {
private static final EventExecutor executor = ImmediateEventExecutor.INSTANCE;
private static final Promise<String> promise = new DefaultPromise<>(executor);
public static void main(String[] args) {
executor.execute(() -> {
try {
//触发监听器
promise.trySuccess(someAsyncOperation());
} catch (Exception e) {
promise.setFailure(e);
}
});
//添加监听器,监听结果
onComplete((ttlRemaining, e) -> {
System.out.println("#Result: " + ttlRemaining);
});
//添加监听器,监听结果
onComplete((ttlRemaining, e) -> {
System.out.println("#Result2: " + ttlRemaining);
});
}
public static void onComplete(BiConsumer<? super String, ? super Throwable> action) {
promise.addListener(f -> {
if (!f.isSuccess()) {
action.accept(null, f.cause());
return;
}
System.out.println("#addListener #onComplete");
action.accept(String.valueOf(f.getNow()), null);
});
}
private static String someAsyncOperation() throws Exception {
Thread.sleep(1000);
return "lua 脚本执行成功";
}
}
io.netty.util.Timeout 使用
摘的源码,就是一个定时器,每隔 10s 递归执行一次
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//执行解锁 lua 脚本
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
//解锁 lua 脚本执行成功,递归执行
if (res) {
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
本文只剖析Redisson ,加锁、解锁、看门狗这部分的源码,至于涉及到Netty 的源码没有追溯,需要从 channelRead 的调用链一直往下 debug。链路很长。本文就到这结束!