0
点赞
收藏
分享

微信扫一扫

Redisson源码研究(包括底层Netty、Promise实现)


纯纯自己一遍遍 debug 的源码,限于文章篇幅,只保留关键节点源码,持续更新

 

Redisson源码研究(包括底层Netty、Promise实现)_java

看门狗由来

先来抛出一个问题:锁因业务代码执行时间过长导致被提前释放,出现这种情况你会怎么去解决?

解决方案

  1. 将锁设置为永不过期,会造成死锁问题,且这把死锁只能手动运维去 redis 中删除
  2. 使用 redis 看门狗机制,开启一个定时任务给锁续命(锁默认 30s 过期,每隔 10s刷新过期时间至 30s)优点在于,锁被设置了过期时间,当出现死锁问题,重启服务器可以关闭定时任务,不至于一直死锁

watchDog 底层用到的 Lua 脚本,没有用 setnx,而是用 hash 结构实现的。至于看门狗续命的条件,只针对于没有设置过期时间的方法,例如下面这些方法而言。

RLock lock = redisson.getLock("watch_key");
lock.lock(); //开启看门狗
lock.tryLock(3, TimeUnit.SECONDS); //开启看门狗

而针对于设置了过期时间的方法,这些方法不开启看门狗

userLock.lock(18, TimeUnit.SECONDS); //到期自动释放
userLock.tryLock(0, 3, TimeUnit.SECONDS);//到期自动释放

开启看门狗 timeOut(触发时机)

里面涉及到Netty、Promise、 lua 脚本!!!!

先来说结论:其加锁是通过异步执行Lua 脚本实现的,然后设置一个监听器监听异步返回的结果,因为Redisson 底层通过Netty去操纵Redis的,当channelRead收到 lua 脚本的执行结果后,会触发promise.trySuccess(null)方法(修改promise 的状态), promise的状态变化了,于是会触发监听器也就是执行onComplete里面的逻辑,最终实时的去启动一个看门狗去续命。

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //执行加锁 lua 脚本
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }
            //锁剩余时间为 null,说明加锁成功
            if (ttlRemaining == null) {
            //开启看门狗
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

由于结合了Netty 操作 redis,以及 promise 机制,在channelRead中不可能每收到一次redis 响应,就重复的去开启看门狗定时器,这对于系统资源来说都是灾难,因此onComplete里面加了一个判断只有是加锁成功 (ttlRemaining==null)的请求,才会开启看门狗续命,另外如果线程 a 加锁后又释放锁了,线程 b 接着加锁,线程 b 也会重复的开启看门狗定时器去续命。由于线程a 解锁操作里面将自己的看门狗释放了,不会造成内存问题。

另外 redisson 对 redis 的操作是包装成了一个RedisExecutor进行执行的。先是获取 redis 连接,然后发送 lua 脚本执行。其他逻辑没有细看

public void execute() {
    if (mainPromise.isCancelled()) {
        free();
        return;
    }
//redis 连接关闭
    if (!connectionManager.getShutdownLatch().acquire()) {
        free();
        //监听器发布取消事件
        mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
        return;
    }

    codec = getCodec(codec);
    
    RFuture<RedisConnection> connectionFuture = getConnection();

    RPromise<R> attemptPromise = new RedissonPromise<R>();
    mainPromiseListener = (r, e) -> {
        if (mainPromise.isCancelled() && connectionFuture.cancel(false)) {
            log.debug("Connection obtaining canceled for {}", command);
            //看门狗注销
            timeout.cancel();
            //监听器修改状态为取消
            if (attemptPromise.cancel(false)) {
                free();
            }
        }
    };
    
    if (attempt == 0) {
        mainPromise.onComplete((r, e) -> {
            if (this.mainPromiseListener != null) {
                this.mainPromiseListener.accept(r, e);
            }
        });
    }
//redis 连接重试
    scheduleRetryTimeout(connectionFuture, attemptPromise);
//监听获取 redis 连接是否成功
    connectionFuture.onComplete((connection, e) -> {
        if (connectionFuture.isCancelled()) {
            connectionManager.getShutdownLatch().release();
            return;
        }
//redis 连接不成功释放资源,直接返回
        if (!connectionFuture.isSuccess()) {
            connectionManager.getShutdownLatch().release();
            exception = convertException(connectionFuture);
            return;
        }
//连接成功发送 lua 脚本到 redis 执行
        sendCommand(attemptPromise, connection);

        writeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                checkWriteFuture(writeFuture, attemptPromise, connection);
            }
        });
//释放 redis 连接
        releaseConnection(attemptPromise, connectionFuture);
    });
//监听器检查,释放 redis 连接的执行结果
    attemptPromise.onComplete((r, e) -> {
        checkAttemptPromise(attemptPromise, connectionFuture);
    });
}

加锁lua 脚本

直接贴源码中的Lua 脚本,主要是利用 redis 中的 hash 结构实现的。主要分俩种情况

  1. 线程第一次加锁(设置hash 键值对)
  2. 同一线程多次加锁(对 value 值累加)

因此Redisson 除了是一把互斥锁外还是一把可重入锁,此外 lua 脚本中的 redis.call 意思就是运行指定 redis 命令,return nil 代表返回默认值为 null。

//判断锁的键值对是否存在
if (redis.call('exists', KEYS[1]) == 0) then
//不存在,将锁写入 redis 并且设置过期时间
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end ;
//同一个线程第一次加锁成功,第二次加锁的时候会走到这,发现 hash 里面已经有锁的记录了
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
//记录锁的重入次数加 1,并且重新设置锁的过期时间
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end ;
//返回锁的过期时间
return redis.call('pttl', KEYS[1]);

解锁lua脚本

  1. 判断锁是否存在,存在无需解锁了
  2. 消除锁重入次数
  3. 直到重入次数全部被消除,删除锁

因此我们在使用多次 lock 的时候要配套多次使用 unlock,不然造成锁删不掉!!!!

-- 判断锁是否存在,存在无需解锁了
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end ;
-- 消除锁重入次数
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 重入次数大于 1,设置过期时间
if (counter > 0) then
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
    -- 直到重入次数全部被消除,删除锁
else
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end ;
return nil;

取消看门狗 timeOut

unlockInnerAsync里面会调用解锁的 lua 脚本,同时返回的RFuture是一个监听器,用来监听lua 脚本执行结果,当 lua 执行成功,触发 cancelExpirationRenewal方法,里面会获取到对应的Timeout,取消看门狗定时器

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    //调用解锁 lua 脚本
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
    //取消看门狗 timeOut 定时器
        cancelExpirationRenewal(threadId);

        if (e != null) {
        //发布失败事件
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }
//发布执行成功事件
        result.trySuccess(null);
    });

    return result;
}

void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    
    if (threadId != null) {
        task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
        //取消看门狗
            timeout.cancel();
        }
        //同时从 EXPIRATION_RENEWAL_MAP 中移除
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

到此Redisson的加锁和解锁流程都梳理好了。下面来讲讲Redisonn 源码中大量用到的Promise、timeOut 怎么进行使用的

io.netty.util.concurrent.Promise 监听器使用

代码如下,当Promise状态改变的时候,会触发onComplete中的方法。

public class main {
    private static final EventExecutor executor = ImmediateEventExecutor.INSTANCE;
    private static final Promise<String> promise = new DefaultPromise<>(executor);
    public static void main(String[] args) {
        executor.execute(() -> {
            try {
                //触发监听器
                promise.trySuccess(someAsyncOperation());
            } catch (Exception e) {
                promise.setFailure(e);
            }
        });
        //添加监听器,监听结果
        onComplete((ttlRemaining, e) -> {
            System.out.println("#Result: " + ttlRemaining);
        });
        //添加监听器,监听结果
        onComplete((ttlRemaining, e) -> {
            System.out.println("#Result2: " + ttlRemaining);
        });
    }
    public static void onComplete(BiConsumer<? super String, ? super Throwable> action) {
        promise.addListener(f -> {
            if (!f.isSuccess()) {
                action.accept(null, f.cause());
                return;
            }
            System.out.println("#addListener #onComplete");
            action.accept(String.valueOf(f.getNow()), null);
        });
    }
    private static String someAsyncOperation() throws Exception {
        Thread.sleep(1000);
        return "lua 脚本执行成功";
    }
}

io.netty.util.Timeout 使用

摘的源码,就是一个定时器,每隔 10s 递归执行一次

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            //执行解锁 lua 脚本
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                //解锁 lua 脚本执行成功,递归执行
                if (res) {
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

本文只剖析Redisson ,加锁、解锁、看门狗这部分的源码,至于涉及到Netty 的源码没有追溯,需要从 channelRead 的调用链一直往下 debug。链路很长。本文就到这结束!

Redisson源码研究(包括底层Netty、Promise实现)_redisson_02


举报

相关推荐

0 条评论