获取锁
# name为锁的key,把name赋值给RedissonObject中的name属性,后续加锁会用到
RLock lock = redissonClient.getLock(name);
加锁源码分析
lock.lock();
@Override
public void lock() {
try {
// 主要流程
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
// 主要流程 leaseTime默认为 -1
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 获取到线程ID
long threadId = Thread.currentThread().getId();
// 加锁主要流程,返回key的过期时长
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 使用了Redis的发布订阅模式来唤醒下文等待的线程,在释放锁的时候会发布一条消息,通过释放锁来实现闭环
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
// 再次获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
// 获取信号量,让没有获取到锁的线程睡眠等待,超过上一个锁对象key失效时长以后重新获取锁
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
// tryAcquireAsync方法为主要获取锁的方法
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// leaseTime默认为 -1
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 默认会调用tryLockInnerAsync方法
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
# 主要加锁流程 RedissonLock#tryLockInnerAsync方法
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// Redisson是通过lua脚本来实现加锁逻辑的
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 如果key不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 向redis中设置一个hash类型的值
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 设置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
// 返回null
"return nil; " +
"end; " +
// 重入锁逻辑
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 在原来值得基础上 +1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 重新设置过期时长
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
// 返回null
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
释放锁源码分析
lock.unlock();
@Override
public void unlock() {
try {
// 主要流程方法:unlockAsync
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
// 主要流程
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
if (e != null) {
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果key存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 发布解锁消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 释放锁
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 释放可重入锁
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
// 发布解锁消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
发布完消息会执行LockPubSub#onMessage方法
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
// 监听解锁逻辑
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 主要闭环 获取信号量唤醒等待的线程
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.getLatch().release(value.getLatch().getQueueLength());
}
}