0
点赞
收藏
分享

微信扫一扫

【JUC系列】LOCK框架系列之七 核心锁类之ReentrantReadWriteLock

佃成成成成 2022-04-30 阅读 56
java

ReentrantReadWriteLock

文章目录

什么是读写锁

读写锁支持支持多个线程同时读,但是当一个写线程访问的时候会阻塞其他所有写和读的线程。

读写锁同时拥有写锁和读锁,通过分离读写锁提高程序的执行效率,在读多写少的场景中,并发性能比单一的排他锁有明显提升。

核心实现思想

读写锁内部重新定义了AQS中的同步状态变量的state。从低16位(0-15)用来记录写锁,用高16位(16-31)记录读锁。写锁的16位可以用来表示写锁被重入的次数。读锁的16位既可以表示一个读线程的重入次数,也可以表示多少个线程获得了读锁。state==0表示既无写锁又无读锁。

        // 共享式的位移量 读锁的使用的位数
        static final int SHARED_SHIFT   = 16;
        // 将1左移16位,结果作为,高位读锁(共享)的计算基本单位 65536
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 锁的最大值 65535
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 写(独占)锁的掩码 65535
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
		// 通过无符号右移快速查询读锁个数
		static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
		// 通过位与运算快速判断写锁个数
		static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
                  -------------------  -------------------
              :   0000 0000 0000 0000  0000 0000 0000 0001
SHARED_UNIT   :   0000 0000 0000 0001  0000 0000 0000 0000
EXCLUSIVE_MASK:   0000 0000 0000 0000  1111 1111 1111 1111

计算演示

sharedCount(65536) ,通过以下计算得知:此时的读锁个数是1个。

-------------------  -------------------
0000 0000 0000 0001  0000 0000 0000 0000   // 65536的二进制
0000 0000 0000 0000  0000 0000 0000 0001   // 65536>>>16的值为1

exclusiveCount(5),通过以下计算得知:此时的读锁个数是5个。

-------------------  -------------------
0000 0000 0000 0000  0000 0000 0000 0101   // 5的二进制
0000 0000 0000 0000  1111 1111 1111 1111   // EXCLUSIVE_MASK 65535
0000 0000 0000 0000  0000 0000 0000 0101   // 5 & 65535 = 5

读写锁的特性

特性说明
公平性支持公平锁与非公平锁,默认非公平模式,吞吐量还是非公平高于公平
重入性该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取写锁之后,能够再次获取写锁,同时可以获取读锁
锁降级遵循读写锁、获取读锁在释放写锁的次序,写锁能够降级为读锁

读写锁的组成

类图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传站可能有防盗链机制,建议将图片保存下来直接上传(img-BfltwiOH-1650612392958)(images\image-20220411111402223.png)]

构造函数

在构造函数中会去构造ReadLock和WriteLock,WriteLock和ReadLock使用的是同一个AQS对象(Sync),这样就具备AQS的特性且互斥。

    /**
    * 无参构造函数 默认采用非公平策略
    */
	public ReentrantReadWriteLock() {
        this(false);
    }

    /**
    * 含参构造函数 false非公平策略 true公平策略
    */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

	// 读锁
    public static class ReadLock implements Lock, java.io.Serializable {
      
        private final Sync sync;

        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }

	// 写锁
	public static class WriteLock implements Lock, java.io.Serializable {
      
        private final Sync sync;

        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }

内部类Sync

构造方法

主要是对读锁计数器的初始化。

        Sync() {
            // 初始化本地线程计数器
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        }

		// 利用ThreadLocal来存储本地线程计数器
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
		// 本地线程计数器 count用来存储数量 tid代表线程的ID值
        static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }
主要成员
        // 共享式的位移量 读锁的使用的位数
        static final int SHARED_SHIFT   = 16;
        // 将1左移16位,结果作为,高位读锁(共享)的计算基本单位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 读(共享)锁的最大值
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 写(独占)锁的最大值
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
 		// 本地线程计数器
        private transient ThreadLocalHoldCounter readHolds;
        // 第一个读线程
        private transient Thread firstReader = null;
        // 第一个读线程的计数
        private transient int firstReaderHoldCount;
        // 缓存的计数器
        private transient HoldCounter cachedHoldCounter;
主要方法
方法名说明
int sharedCount(int c)
int exclusiveCount(int c)
abstract boolean readerShouldBlock();读请求时是否阻塞由公平和非公平去实现。公平模式下,判断是否存在队列且在其他结点之后,如果有前驱结点,则返回true,阻塞获取读锁。非公平模式下,如果其他线程获取了写锁,则返回true 阻塞当前线程获取读锁。
abstract boolean writerShouldBlock();写请求时是否阻塞由公平和非公平去实现。公平模式下,判断是否存在队列且在其他结点之后,如果有前驱结点,则返回true,阻塞获取写锁。非公平模式下,直接不阻塞,返回false。
final boolean tryRelease(int releases)
final boolean tryAcquire(int acquires)
final boolean tryReleaseShared(int unused)
final int tryAcquireShared(int unused)
final int fullTryAcquireShared(Thread current)
final boolean tryWriteLock()
final boolean tryReadLock()
final int getReadLockCount()返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,一个线程连续获取了(重入)n次读书,那么这个方法的返回值是n,但是线程数只有1。
final boolean isWriteLocked()判断写锁是否已经被获取
final int getWriteHoldCount()返回当前写锁被获取的次数
final int getReadHoldCount()返回当前线程获取读锁的次数。
final int getCount()
Sync完整源码
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;

        /*
         * Read vs write count extraction constants and functions.
         * Lock state is logically divided into two unsigned shorts:
         * The lower one representing the exclusive (writer) lock hold count,
         * and the upper the shared (reader) hold count.
         * 锁的状态在逻辑上使用两个unsigned shorts:低位的表示独占(写入)锁保持计数,高位的表示共享(读取)锁保持计数。简单来说就是使用32位逻辑上被拆分成了两个16位,高16位存储读锁的状态 低32位存储写锁的状态
         */

        // 共享式的位移量 读锁的使用的位数
        static final int SHARED_SHIFT   = 16;
        // 将1左移16位,结果作为,高位读锁(共享)的计算基本单位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 锁的最大值
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 写(独占)锁的掩码 用来快速计算写锁
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  占有读锁的线程数量*/
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  占有写锁的线程数量*/
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

        /**
         * A counter for per-thread read hold counts.
         * Maintained as a ThreadLocal; cached in cachedHoldCounter
         */
        static final class HoldCounter {
            int count = 0;
            // 使用线程ID,而不是引用,避免垃圾无法回收
            final long tid = getThreadId(Thread.currentThread());
        }

        /**
         * ThreadLocal subclass. Easiest to explicitly define for sake
         * of deserialization mechanics.
         */
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        /**
         * The number of reentrant read locks held by current thread.
         * Initialized only in constructor and readObject.
         * Removed whenever a thread's read hold count drops to 0.
         // 本地线程计数器
         */
        private transient ThreadLocalHoldCounter readHolds;

        /**
         * The hold count of the last thread to successfully acquire
         * readLock. This saves ThreadLocal lookup in the common case
         * where the next thread to release is the last one to
         * acquire. This is non-volatile since it is just used
         * as a heuristic, and would be great for threads to cache.
         *
         * <p>Can outlive the Thread for which it is caching the read
         * hold count, but avoids garbage retention by not retaining a
         * reference to the Thread.
         *
         * <p>Accessed via a benign data race; relies on the memory
         * model's final field and out-of-thin-air guarantees.
         // 缓存的计数器
         */
        private transient HoldCounter cachedHoldCounter;

        /**
         * firstReader is the first thread to have acquired the read lock.
         * firstReaderHoldCount is firstReader's hold count.
         *
         * <p>More precisely, firstReader is the unique thread that last
         * changed the shared count from 0 to 1, and has not released the
         * read lock since then; null if there is no such thread.
         *
         * <p>Cannot cause garbage retention unless the thread terminated
         * without relinquishing its read locks, since tryReleaseShared
         * sets it to null.
         *
         * <p>Accessed via a benign data race; relies on the memory
         * model's out-of-thin-air guarantees for references.
         *
         * <p>This allows tracking of read holds for uncontended read
         * locks to be very cheap.
         */
        // 第一个读线程
        private transient Thread firstReader = null;
        // 第一个读线程的计数
        private transient int firstReaderHoldCount;

        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            // 设置AQS的状态
            setState(getState()); // ensures visibility of readHolds
        }

        /*
         * Acquires and releases use the same code for fair and
         * nonfair locks, but differ in whether/how they allow barging
         * when queues are non-empty.
         */

        /**
         * Returns true if the current thread, when trying to acquire
         * the read lock, and otherwise eligible to do so, should block
         * because of policy for overtaking other waiting threads.
         */
        abstract boolean readerShouldBlock();

        /**
         * Returns true if the current thread, when trying to acquire
         * the write lock, and otherwise eligible to do so, should block
         * because of policy for overtaking other waiting threads.
         */
        abstract boolean writerShouldBlock();

        /*
         * Note that tryRelease and tryAcquire can be called by
         * Conditions. So it is possible that their arguments contain
         * both read and write holds that are all released during a
         * condition wait and re-established in tryAcquire.
         */
        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

        private IllegalMonitorStateException unmatchedUnlockException() {
            return new IllegalMonitorStateException(
                "attempt to unlock read lock, not locked by current thread");
        }

        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

        /**
         * Full version of acquire for reads, that handles CAS misses
         * and reentrant reads not dealt with in tryAcquireShared.
         */
        final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

        /**
         * Performs tryLock for write, enabling barging in both modes.
         * This is identical in effect to tryAcquire except for lack
         * of calls to writerShouldBlock.
         */
        final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

        /**
         * Performs tryLock for read, enabling barging in both modes.
         * This is identical in effect to tryAcquireShared except for
         * lack of calls to readerShouldBlock.
         */
        final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // Methods relayed to outer class

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        final Thread getOwner() {
            // Must read state before owner to ensure memory consistency
            return ((exclusiveCount(getState()) == 0) ?
                    null :
                    getExclusiveOwnerThread());
        }

        final int getReadLockCount() {
            return sharedCount(getState());
        }

        final boolean isWriteLocked() {
            return exclusiveCount(getState()) != 0;
        }

        final int getWriteHoldCount() {
            return isHeldExclusively() ? exclusiveCount(getState()) : 0;
        }

        final int getReadHoldCount() {
            if (getReadLockCount() == 0)
                return 0;

            Thread current = Thread.currentThread();
            if (firstReader == current)
                return firstReaderHoldCount;

            HoldCounter rh = cachedHoldCounter;
            if (rh != null && rh.tid == getThreadId(current))
                return rh.count;

            int count = readHolds.get().count;
            if (count == 0) readHolds.remove();
            return count;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            readHolds = new ThreadLocalHoldCounter();
            setState(0); // reset to unlocked state
        }

        final int getCount() { return getState(); }
    }

NonFairSync

继承了Sync,主要实现writerShouldBlock()readerShouldBlock()

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        // 非公平模式下,线程在竞争写锁的之前,不会被阻塞
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        // 非公平模式下,线程在竞争读锁的之前,如果队列中第一个是写锁,则被阻塞
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
    }

FairSync

继承了Sync,主要实现writerShouldBlock()readerShouldBlock()

static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        // 公平模式下,线程在竞争写锁的之前,如果队列中有其他线程在排队,则当前线程被阻塞
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        // 公平模式下,线程在竞争读锁的之前,如果队列中有其他线程在排队,则当前线程被阻塞
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

写锁的获取与释放

由于写锁的操作要对读锁可见,如果读锁已经被获取了,写锁再被获取的话,就无法保证其他读线程感知到当前写线程的操作。

tryAcquire(int acquires)

        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:演练
             * 1. If read count nonzero or write count nonzero 如果读取计数非零或写入计数非零并且所有者是不同的线程,则失败。
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only 如果计数饱和,则失败。 (这只有在 count 已经非零时才会发生。)
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if 否则,如果该线程是可重入获取或队列策略允许,则该线程有资格获得锁。如果是,更新state并设置owner。
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            // 获取线程
            Thread current = Thread.currentThread();
            // 获取同步状态值
            int c = getState();
            // 获取写锁的计数
            int w = exclusiveCount(c);
            // 如果不为0,说明存在锁,但是不知是何种锁
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                // 1.写锁计数为0(w==0)。说明此时只有读锁,不能将读锁升级为写锁,所以直接返回false。
                // 2.写锁计数不为0,但不是当前线程持有的写锁(current != getExclusiveOwnerThread())。直接返回false。
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 计算写锁的数量,如果溢出了,就抛出异常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // 执行到这里,说明肯定是当前线程持有的写锁,那么此时没有线程竞争,更新写锁计数
                setState(c + acquires);
                return true;
            }
            // 说明此时c为0,无锁状态中。
            // writerShouldBlock在公平模式下,判断是否存在队列且在其他结点之后,如果有前驱结点,则返回true,阻塞获取写锁。非公平模式下,直接不阻塞,尝试进行写锁的CAS操作失败。则返回false;在非公平模式下,不被阻塞。
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            // 设置当前占有者
            setExclusiveOwnerThread(current);
            return true;
        }

tryRelease(int releases)

        protected final boolean tryRelease(int releases) {
            // 如果当前线程不是锁的占有者,直接异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 计算同步状态的值
            int nextc = getState() - releases;
            // 判断写锁是否已经全部释放
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                // 写锁完全释放需要情况锁的占用者
                setExclusiveOwnerThread(null);
            // 更新state
            setState(nextc);
            return free;
        }

读锁的获取与释放

tryAcquireShared(int unused)

        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough: 演练
             * 1. If write lock held by another thread, fail. 如果写锁被另一个线程持有,则失败。
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 否则,这个线程有资格获得锁wrt状态,所以询问它是否应该因为队列策略而阻塞。如果没有,尝试通过CAS状态和更新计数授予。注意,step没有检查可重入获取,这被推迟到完整版本,以避免在更典型的不可重入情况下必须检查hold count。
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             * 如果由于线程显然不符合条件或 CAS 失败或计数已饱和,步骤 2 失败,则链接到具有完整重试循环的版本。
             */
            // 获取当前线程
            Thread current = Thread.currentThread();
            // 获取同步状态值
            int c = getState();
            // 如果有写锁且当前线程并不是拥有写锁的线程,获取失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 获得读锁的计数
            int r = sharedCount(c);
            // readerShouldBlock在公平模式下,判断是否存在队列且在其他结点之后,如果有前驱结点,则返回true,阻塞获取读锁。非公平模式下,如果其他线程获取了写锁,则返回true 阻塞当前线程获取读锁。读锁的计数在合理返回内,且CAS操作更新同步状态
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                // 此时,当前线程已经获得了读锁
                
                // 如果成立,说明当前线程是第一个获取到读锁的线程,设置一下firstReader和firstReaderHoldCount。
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 当前线程就是第一个获得该读锁的线程,则firstReaderHoldCount增加1
                    firstReaderHoldCount++;
                } else {
                    // 获得的HoldCounter指不定是哪个线程的HoldCounter
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        // 如果为null说明从来没有设置过AQS的cachedHoldCounter
                        //如果不为null但线程id不是当前的,说明重新设置
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        //如果cachedHoldCounter的线程id就是当前线程id,且count为0
                        readHolds.set(rh);
                    //不管怎样,局部变量rh的count都要加1
                    rh.count++;
                }
                return 1;
            }
            // 如果获取读锁被阻塞 或CAS操作失败 或者当前读锁的计数已经最大,则需要进行全面尝试获取锁
            return fullTryAcquireShared(current);
        }

fullTryAcquireShared(Thread current)

        final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             * 此代码与 tryAcquireShared 中的代码部分冗余,但总体上更简单,因为不会使 tryAcquireShared 与重试和延迟读取保持计数之间的交互复杂化。
             */
            //只要rh不为null,那么它一定指向当前线程的HoldCounter对象
            HoldCounter rh = null;
            // 无线循环--尝试获取读锁
            for (;;) {
                int c = getState();
                // 若存在写锁且拥有写锁的线程不是当前线程,则直接失败。如果写锁是当前线程那就继续执行
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // 第一个获取读锁的是当前线程,且尚未完全释放读锁
                        // assert firstReaderHoldCount > 0;
                    } else {
                        
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();//当前线程如果没有获得读锁,get到的count值为0
                                if (rh.count == 0)
                                    //当前线程没有获得读锁时,本来它的HoldCounter成员本来就应该为null,所以要remove
                                    readHolds.remove();
                            }
                        }
                        //rh局部变量还保留着当前线程的HoldCounter成员的引用
                        if (rh.count == 0)
                            return -1;
                    }
                }
                // 读锁计数的验证
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // CAS操作获取读锁
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

tryReleaseShared(int unused)

        protected final boolean tryReleaseShared(int unused) {
            // 获取当前线程
            Thread current = Thread.currentThread();
            // 如果当前线程是第一个获取读锁的线程,则进行读锁的部分释放与完全释放的选择
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                // 获取当前线程的HoldCounter
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    // 无锁需要释放了,直接异常
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                //执行到这里,说明当前线程持有读锁中,那么减小读锁计数1
                --rh.count;
            }
            //无限循环-CAS修改同步状态
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    //只有在读写锁都是干净的情况,才返回true
                    return nextc == 0;
            }
        }

锁的降级

如果当前线程不获取读锁直接释放写锁,在其使用数据期间,就无法感知其他线程获取了写锁并修改了之后的数据,在对数据比较敏感的场景下,为了保证数据的可见性,在当前线程使用完数据之后,写锁才能被其他线程获取,这样需要一个机制来满足这个需求:锁降级。

锁降级指当前线程同时持有读锁和写锁后,先释放了写锁,使得写锁降级为了读锁。

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class LockDegrade {

    private  int i=0;

    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);
    private final Lock readLock = readWriteLock.readLock();
    private final Lock writeLock = readWriteLock.writeLock();


    public void processData() {
        writeLock.lock();
        try {
            i++;
            readLock.lock();
        } finally {
            writeLock.unlock();
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            if (i == 1) {
                System.out.println("获取锁的当前线程[" + Thread.currentThread().getName() + "] i的值是" + i);
            } else {
                System.out.println("获取锁的当前线程[" + Thread.currentThread().getName() + "] i=" + i);
            }
        } finally {
            readLock.unlock();
        }
    }

    public static void main(String[] args) {
        LockDegrade demo = new LockDegrade();
        for (int x = 0; x < 5; x++) {
            new Thread(() -> {
                demo.processData();
            }).start();
        }
    }
}

锁降级结果

获取锁的当前线程[Thread-1] i的值是1
获取锁的当前线程[Thread-0] i=2
获取锁的当前线程[Thread-2] i=3
获取锁的当前线程[Thread-3] i=4
获取锁的当前线程[Thread-4] i=5

如果将17行和34行注释掉,执行结果

获取锁的当前线程[Thread-2] i=5
获取锁的当前线程[Thread-0] i=5
获取锁的当前线程[Thread-1] i=5
获取锁的当前线程[Thread-3] i=5
获取锁的当前线程[Thread-4] i=5

  • 为了有了ReentrantLock还需要ReentrantReadWriteLock?
  • ReentrantReadWriteLock底层实现原理?
  • ReentrantReadWriteLock底层读写状态如何设计的? 高16位为读锁,低16位为写锁
  • 读锁和写锁的最大数量是多少?
  • 本地线程计数器ThreadLocalHoldCounter是用来做什么的?
  • 缓存计数器HoldCounter是用来做什么的?
  • 写锁的获取与释放是怎么实现的?
  • 读锁的获取与释放是怎么实现的?
  • RentrantReadWriteLock为什么不支持锁升级?
  • 什么是锁的升降级? RentrantReadWriteLock为什么不支持锁升级?
举报

相关推荐

0 条评论