0
点赞
收藏
分享

微信扫一扫

Redisson#lock加锁原理

求阙者 2022-03-12 阅读 114
javaredis

获取锁

# name为锁的key,把name赋值给RedissonObject中的name属性,后续加锁会用到
RLock lock = redissonClient.getLock(name);

加锁源码分析

lock.lock();
@Override
public void lock() {
	try {
		// 主要流程
		lockInterruptibly();
	} catch (InterruptedException e) {
		Thread.currentThread().interrupt();
	}
}
@Override
public void lockInterruptibly() throws InterruptedException {
	// 主要流程 leaseTime默认为 -1
	lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
	// 获取到线程ID
	long threadId = Thread.currentThread().getId();
	// 加锁主要流程,返回key的过期时长
	Long ttl = tryAcquire(leaseTime, unit, threadId);
	// lock acquired
	if (ttl == null) {
		return;
	}

	// 使用了Redis的发布订阅模式来唤醒下文等待的线程,在释放锁的时候会发布一条消息,通过释放锁来实现闭环
	RFuture<RedissonLockEntry> future = subscribe(threadId);
	commandExecutor.syncSubscription(future);

	try {
		while (true) {
			// 再次获取锁
			ttl = tryAcquire(leaseTime, unit, threadId);
			// lock acquired
			if (ttl == null) {
				break;
			}

			// waiting for message
			if (ttl >= 0) {
				// 获取信号量,让没有获取到锁的线程睡眠等待,超过上一个锁对象key失效时长以后重新获取锁
				getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
			} else {
				getEntry(threadId).getLatch().acquire();
			}
		}
	} finally {
		unsubscribe(future, threadId);
	}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
	// tryAcquireAsync方法为主要获取锁的方法
	return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
	// leaseTime默认为 -1
	if (leaseTime != -1) {
		return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
	}
	
	// 默认会调用tryLockInnerAsync方法
	RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
	ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
		if (e != null) {
			return;
		}

		// lock acquired
		if (ttlRemaining == null) {
			scheduleExpirationRenewal(threadId);
		}
	});
	return ttlRemainingFuture;
}
# 主要加锁流程 RedissonLock#tryLockInnerAsync方法
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
	internalLockLeaseTime = unit.toMillis(leaseTime);
	// Redisson是通过lua脚本来实现加锁逻辑的
	return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
			  // 如果key不存在
			  "if (redis.call('exists', KEYS[1]) == 0) then " +
				  // 向redis中设置一个hash类型的值
				  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
				  // 设置过期时间
				  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
				  // 返回null
				  "return nil; " +
			  "end; " +
			  // 重入锁逻辑
			  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
				  // 在原来值得基础上 +1
				  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
				  // 重新设置过期时长
				  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
				  // 返回null
				  "return nil; " +
			  "end; " +
			  "return redis.call('pttl', KEYS[1]);",
				Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

释放锁源码分析

lock.unlock();
@Override
public void unlock() {
	try {
		// 主要流程方法:unlockAsync
		get(unlockAsync(Thread.currentThread().getId()));
	} catch (RedisException e) {
		if (e.getCause() instanceof IllegalMonitorStateException) {
			throw (IllegalMonitorStateException) e.getCause();
		} else {
			throw e;
		}
	}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
	RPromise<Void> result = new RedissonPromise<Void>();
	// 主要流程
	RFuture<Boolean> future = unlockInnerAsync(threadId);

	future.onComplete((opStatus, e) -> {
		if (e != null) {
			cancelExpirationRenewal(threadId);
			result.tryFailure(e);
			return;
		}

		if (opStatus == null) {
			IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
					+ id + " thread-id: " + threadId);
			result.tryFailure(cause);
			return;
		}
		if (opStatus) {
			cancelExpirationRenewal(null);
		}
		result.trySuccess(null);
	});

	return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
	return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
			// 如果key存在
			"if (redis.call('exists', KEYS[1]) == 0) then " +
				// 发布解锁消息
				"redis.call('publish', KEYS[2], ARGV[1]); " +
				"return 1; " +
			"end;" +
			// 释放锁
			"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
				"return nil;" +
			"end; " +
			// 释放可重入锁
			"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
			"if (counter > 0) then " +
				"redis.call('pexpire', KEYS[1], ARGV[2]); " +
				"return 0; " +
			"else " +
				"redis.call('del', KEYS[1]); " +
				// 发布解锁消息
				"redis.call('publish', KEYS[2], ARGV[1]); " +
				"return 1; "+
			"end; " +
			"return nil;",
			Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}

发布完消息会执行LockPubSub#onMessage方法

@Override
protected void onMessage(RedissonLockEntry value, Long message) {
	// 监听解锁逻辑
	if (message.equals(UNLOCK_MESSAGE)) {
		Runnable runnableToExecute = value.getListeners().poll();
		if (runnableToExecute != null) {
			runnableToExecute.run();
		}

		// 主要闭环 获取信号量唤醒等待的线程
		value.getLatch().release();
	} else if (message.equals(READ_UNLOCK_MESSAGE)) {
		while (true) {
			Runnable runnableToExecute = value.getListeners().poll();
			if (runnableToExecute == null) {
				break;
			}
			runnableToExecute.run();
		}
		
		value.getLatch().release(value.getLatch().getQueueLength());
	}
}
举报

相关推荐

0 条评论