0
点赞
收藏
分享

微信扫一扫

ReentrantReadWriteLock之读锁加锁源码解析

ReentrantReadWriteLock是Java中读写锁的一种实现,他可以让多个线程同时读取共享资源——读共享,写入的时候只允许一个线程独占——写独占.了解过ReentrantLock的应该知道这个是一个可重入锁,允许一个线程可以多次获取相同类型的锁.在读操作多于写操作的场景下,读写锁能够提高读操作的并发性,同时保证写操作的独占性.

读锁的使用

ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReadLock readLock = reentrantReadWriteLock.readLock();
readLock.lock();

new ReentrantReadWriteLock()的时候源码

    public ReentrantReadWriteLock(boolean fair) {
        // 根据传进来的参数,来选择是否为公平锁
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

继续往下看,公平锁/公平锁都会继承Sync对象,这个对象的构造方法

Sync() {  
    // 
    readHolds = new ThreadLocalHoldCounter();
    // 这里有一个关于volatile的知识点:
    // 为什么说ensures visibility of readHolds?
    // 首先获取当前state的值,然后再将获取的state值赋值给state
    // state是被volatile修饰的,对于一个volatile变量的写操作,会保证其前面对普通变量的写操作对volatile写操作是可以见的.
    // 即:当第二个操作是volatile写时,无论第一个操作是什么,都不会发生指令重排
    setState(getState()); // ensures visibility of readHolds
}

先看下ThreadLocalHoldCounter类

    // ThreadLocal 的子类,其实就是一个ThreadLoca
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        // 重写了ThreadLocal的initialValue方法
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

再来看下HoldCounter类

static final class HoldCounter {
    // 持有的读锁数
    int count = 0;
    // 往里面看可以看到这里获取线程id并不是简单的使用Thread.getId()方法,
    // 而是 UNSAFE.getLongVolatile(thread, TID_OFFSET);
    // 根据注释可以知道,Thread.getId()不是被final修饰的,
    // 子类是可以重写该方法,这样就会导致映射不唯一.
    final long tid = getThreadId(Thread.currentThread());
}

下面来看看读锁的源码

    public void lock() {
        // sync是new ReentrantReadWriteLock()的时候进行初始化的.
        sync.acquireShared(1);
    }

public final void acquireShared(int arg) {
    // 尝试获取锁,当返回大于0的时候,代表获取锁成功
    if (tryAcquireShared(arg) < 0)
        // 获取锁失败,入队
        doAcquireShared(arg);
}

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    // 获取state
    int c = getState();
    // exclusiveCount(c):
    // 方法是Sync中的静态方法,用来判断是否有写锁(独占锁),后面会具体分析
    if (exclusiveCount(c) != 0 &&
        // exclusiveOwnerThread是AbstractOwnableSynchronizer的属性,
        // 记录了当前获取写锁的线程
        getExclusiveOwnerThread() != current)
        // 来到这里说明:a.有独占写锁,
        //			  b.持用写锁的并不是当前线程
        // 无法获取读锁,需要去排队
        // 否则,如果有独占锁,并且持有写锁的是当前线程,就可以获取写锁,即锁降级
        return -1;
    // sharedCount(c):
    // 和exclusiveCount(c)一样,是Sync的静态方法,用来判断读锁的状态
    int r = sharedCount(c);
    // readerShouldBlock():判断是否需要进入到阻塞队列,后面会详细分析
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        // SHARED_UNIT: 1左移16位,相当于c的高16位+1,后面会讲为什么会左移16位
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            // r = 0 说明此线程是第一个加读锁的线程
            // firstReader: Sync的属性,用来记录当前第一个获取读锁的线程
            // firstReaderHoldCount: Sync的属性,用来记录当前第一个获取读锁的线程获取锁的次数
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // firstReader == current:说明当前线程为第一个获取读锁的线程,直接count直接+1就行
            firstReaderHoldCount++;
        } else {
            // 来到这里说明:当前线程并不是firstReader记录的线程
            // cachedHoldCounter: Sync的属性,用来缓存最后一个获取读锁的信息的
            // HoldCounter:已经在上面讲过了,用来管理线程和与之对应的读锁的重入次数
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                // 设置cachedHoldCounter缓存,通过ThreadLocal获取
                // readHolds:其实就是ThreadLocal,上面讲过在Sync的构造方法执行的时候,进行初始化的.
                // 这里有get,那么就会有对应的set,后面会讲
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // 来到这里说明:cachedHoldCounter里边缓存了当前线程获取锁的记录,
                // 但是可能是通过初始化readHolds.get()初始化获取的,因此count = 0
                readHolds.set(rh);
            // 给当前线程的读锁的重入次数+1
            rh.count++;
        }
        // 获取读锁成功
        return 1;
    }
    // 1.readerShouldBlock()方法之后需要进去到阻塞队列
    // 2.r超出了最大值
    // 3.竞争锁失败(cas失败)
    return fullTryAcquireShared(current);
}

在分析fullTryAcquireShared(current);之前需要对上面的部分代码进行解释.

在解释之前,需要学习下ReentrantReadWriteLock的实现原理

ReentrantReadWriteLock是读写可重入锁,相比ReentrantLock来说,都是通过state来实现锁的状态,大家都知道ReentrantLock的state=0代表可加锁,state>0,state等于几就代表重入的次数.

要想通过state一个属性来表示是否可以加锁和重入次数,以及是读锁还是写锁该如何实现呢?

ReentrantReadWriteLock里边,将 state 这个 32 位的 int 值分为高 16 位和低 16位,分别用于共享模式和独占模式。

ReentrantReadWriteLock之读锁加锁源码解析_初始化

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

 static int exclusiveCount(int c) { 
    // 取c的低16位
    return c & EXCLUSIVE_MASK; 
 }
 static int sharedCount(int c) { 
     // 取c的高16位
    return c >>> SHARED_SHIFT;
 }

readerShouldBlock()方法分为来公平方式和非公平方式.

FairSync:
 final boolean readerShouldBlock() {
     // 看队列中是否有在排队的,false——>不需要排队,ture——>需要排队
     return hasQueuedPredecessors();
 }

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

NonfairSync:
final boolean readerShouldBlock() {
    // 按道理非公平锁是不需要排队的,这里有一个优化,防止写锁饥饿等待
    return apparentlyFirstQueuedIsExclusive();
}

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        // 如果头节点的下一个节点不是共享模式的节点(即是写操作),
        // 则即使当前有读锁,也需要进去等待队列进行排队
        !s.isShared()         &&
        s.thread != null;
}

下面来看最后一个方法:

fullTryAcquireShared(current),再次尝试获取锁.

上面列举了会执行该方法的三种情况:

1.需要进去到阻塞队列

2.r超出了最大值

3.竞争锁失败(cas失败)

(1)对于第一种情况,如果是因为在NonfairSync情况下,虽然head.next是获取写锁的,并且它等待了很久,我可以不和他抢,但是我是重入读锁,那就只能对不起了.

(2)对于第三种情况,因为cas失败,如果就此返回,就要进入到阻塞队列里了,会有些不甘心,因为已经满足了第一种情况了,所以进入到这个方法其实是增加CAS成功的机会.

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    // 自璇
    for (;;) {
        // 获取当前的state
        int c = getState();
        // 再来判断一次,可能由其他情况进入到这个方法中的.
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            // 就是上面说的第一种情况,需要判断是否是重入的
            if (firstReader == current) {
                // firstReader记录的是当前线程,满足重入条件
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            // 防止是因为上面readHolds.get();初始化出来的
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    // count == 0,显然不符合重入条件
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 处理重入
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}


举报

相关推荐

0 条评论