ReentrantReadWriteLock
文章目录
什么是读写锁
读写锁支持支持多个线程同时读,但是当一个写线程访问的时候会阻塞其他所有写和读的线程。
读写锁同时拥有写锁和读锁,通过分离读写锁提高程序的执行效率,在读多写少的场景中,并发性能比单一的排他锁有明显提升。
核心实现思想
读写锁内部重新定义了AQS中的同步状态变量的state。从低16位(0-15)用来记录写锁,用高16位(16-31)记录读锁。写锁的16位可以用来表示写锁被重入的次数。读锁的16位既可以表示一个读线程的重入次数,也可以表示多少个线程获得了读锁。state==0
表示既无写锁又无读锁。
// 共享式的位移量 读锁的使用的位数
static final int SHARED_SHIFT = 16;
// 将1左移16位,结果作为,高位读锁(共享)的计算基本单位 65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 锁的最大值 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写(独占)锁的掩码 65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 通过无符号右移快速查询读锁个数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 通过位与运算快速判断写锁个数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
------------------- -------------------
: 0000 0000 0000 0000 0000 0000 0000 0001
SHARED_UNIT : 0000 0000 0000 0001 0000 0000 0000 0000
EXCLUSIVE_MASK: 0000 0000 0000 0000 1111 1111 1111 1111
计算演示
sharedCount(65536) ,通过以下计算得知:此时的读锁个数是1个。
------------------- -------------------
0000 0000 0000 0001 0000 0000 0000 0000 // 65536的二进制
0000 0000 0000 0000 0000 0000 0000 0001 // 65536>>>16的值为1
exclusiveCount(5),通过以下计算得知:此时的读锁个数是5个。
------------------- -------------------
0000 0000 0000 0000 0000 0000 0000 0101 // 5的二进制
0000 0000 0000 0000 1111 1111 1111 1111 // EXCLUSIVE_MASK 65535
0000 0000 0000 0000 0000 0000 0000 0101 // 5 & 65535 = 5
读写锁的特性
特性 | 说明 |
---|---|
公平性 | 支持公平锁与非公平锁,默认非公平模式,吞吐量还是非公平高于公平 |
重入性 | 该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取写锁之后,能够再次获取写锁,同时可以获取读锁 |
锁降级 | 遵循读写锁、获取读锁在释放写锁的次序,写锁能够降级为读锁 |
读写锁的组成
类图
构造函数
在构造函数中会去构造ReadLock和WriteLock,WriteLock和ReadLock使用的是同一个AQS对象(Sync),这样就具备AQS的特性且互斥。
/**
* 无参构造函数 默认采用非公平策略
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* 含参构造函数 false非公平策略 true公平策略
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
// 读锁
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
// 写锁
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
内部类Sync
构造方法
主要是对读锁计数器的初始化。
Sync() {
// 初始化本地线程计数器
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
// 利用ThreadLocal来存储本地线程计数器
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 本地线程计数器 count用来存储数量 tid代表线程的ID值
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
主要成员
// 共享式的位移量 读锁的使用的位数
static final int SHARED_SHIFT = 16;
// 将1左移16位,结果作为,高位读锁(共享)的计算基本单位
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 读(共享)锁的最大值
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写(独占)锁的最大值
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 本地线程计数器
private transient ThreadLocalHoldCounter readHolds;
// 第一个读线程
private transient Thread firstReader = null;
// 第一个读线程的计数
private transient int firstReaderHoldCount;
// 缓存的计数器
private transient HoldCounter cachedHoldCounter;
主要方法
方法名 | 说明 |
---|---|
int sharedCount(int c) | |
int exclusiveCount(int c) | |
abstract boolean readerShouldBlock(); | 读请求时是否阻塞由公平和非公平去实现。公平模式下,判断是否存在队列且在其他结点之后,如果有前驱结点,则返回true,阻塞获取读锁。非公平模式下,如果其他线程获取了写锁,则返回true 阻塞当前线程获取读锁。 |
abstract boolean writerShouldBlock(); | 写请求时是否阻塞由公平和非公平去实现。公平模式下,判断是否存在队列且在其他结点之后,如果有前驱结点,则返回true,阻塞获取写锁。非公平模式下,直接不阻塞,返回false。 |
final boolean tryRelease(int releases) | |
final boolean tryAcquire(int acquires) | |
final boolean tryReleaseShared(int unused) | |
final int tryAcquireShared(int unused) | |
final int fullTryAcquireShared(Thread current) | |
final boolean tryWriteLock() | |
final boolean tryReadLock() | |
final int getReadLockCount() | 返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,一个线程连续获取了(重入)n次读书,那么这个方法的返回值是n,但是线程数只有1。 |
final boolean isWriteLocked() | 判断写锁是否已经被获取 |
final int getWriteHoldCount() | 返回当前写锁被获取的次数 |
final int getReadHoldCount() | 返回当前线程获取读锁的次数。 |
final int getCount() |
Sync完整源码
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
* 锁的状态在逻辑上使用两个unsigned shorts:低位的表示独占(写入)锁保持计数,高位的表示共享(读取)锁保持计数。简单来说就是使用32位逻辑上被拆分成了两个16位,高16位存储读锁的状态 低32位存储写锁的状态
*/
// 共享式的位移量 读锁的使用的位数
static final int SHARED_SHIFT = 16;
// 将1左移16位,结果作为,高位读锁(共享)的计算基本单位
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 锁的最大值
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写(独占)锁的掩码 用来快速计算写锁
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count 占有读锁的线程数量*/
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count 占有写锁的线程数量*/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count = 0;
// 使用线程ID,而不是引用,避免垃圾无法回收
final long tid = getThreadId(Thread.currentThread());
}
/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
// 本地线程计数器
*/
private transient ThreadLocalHoldCounter readHolds;
/**
* The hold count of the last thread to successfully acquire
* readLock. This saves ThreadLocal lookup in the common case
* where the next thread to release is the last one to
* acquire. This is non-volatile since it is just used
* as a heuristic, and would be great for threads to cache.
*
* <p>Can outlive the Thread for which it is caching the read
* hold count, but avoids garbage retention by not retaining a
* reference to the Thread.
*
* <p>Accessed via a benign data race; relies on the memory
* model's final field and out-of-thin-air guarantees.
// 缓存的计数器
*/
private transient HoldCounter cachedHoldCounter;
/**
* firstReader is the first thread to have acquired the read lock.
* firstReaderHoldCount is firstReader's hold count.
*
* <p>More precisely, firstReader is the unique thread that last
* changed the shared count from 0 to 1, and has not released the
* read lock since then; null if there is no such thread.
*
* <p>Cannot cause garbage retention unless the thread terminated
* without relinquishing its read locks, since tryReleaseShared
* sets it to null.
*
* <p>Accessed via a benign data race; relies on the memory
* model's out-of-thin-air guarantees for references.
*
* <p>This allows tracking of read holds for uncontended read
* locks to be very cheap.
*/
// 第一个读线程
private transient Thread firstReader = null;
// 第一个读线程的计数
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
// 设置AQS的状态
setState(getState()); // ensures visibility of readHolds
}
/*
* Acquires and releases use the same code for fair and
* nonfair locks, but differ in whether/how they allow barging
* when queues are non-empty.
*/
/**
* Returns true if the current thread, when trying to acquire
* the read lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean readerShouldBlock();
/**
* Returns true if the current thread, when trying to acquire
* the write lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean writerShouldBlock();
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
/**
* Performs tryLock for write, enabling barging in both modes.
* This is identical in effect to tryAcquire except for lack
* of calls to writerShouldBlock.
*/
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
/**
* Performs tryLock for read, enabling barging in both modes.
* This is identical in effect to tryAcquireShared except for
* lack of calls to readerShouldBlock.
*/
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
// Methods relayed to outer class
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}
final int getReadLockCount() {
return sharedCount(getState());
}
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}
final int getCount() { return getState(); }
}
NonFairSync
继承了Sync,主要实现writerShouldBlock()
和readerShouldBlock()
。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
// 非公平模式下,线程在竞争写锁的之前,不会被阻塞
final boolean writerShouldBlock() {
return false; // writers can always barge
}
// 非公平模式下,线程在竞争读锁的之前,如果队列中第一个是写锁,则被阻塞
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
FairSync
继承了Sync,主要实现writerShouldBlock()
和readerShouldBlock()
。
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
// 公平模式下,线程在竞争写锁的之前,如果队列中有其他线程在排队,则当前线程被阻塞
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
// 公平模式下,线程在竞争读锁的之前,如果队列中有其他线程在排队,则当前线程被阻塞
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
写锁的获取与释放
由于写锁的操作要对读锁可见,如果读锁已经被获取了,写锁再被获取的话,就无法保证其他读线程感知到当前写线程的操作。
tryAcquire(int acquires)
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:演练
* 1. If read count nonzero or write count nonzero 如果读取计数非零或写入计数非零并且所有者是不同的线程,则失败。
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only 如果计数饱和,则失败。 (这只有在 count 已经非零时才会发生。)
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if 否则,如果该线程是可重入获取或队列策略允许,则该线程有资格获得锁。如果是,更新state并设置owner。
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
// 获取线程
Thread current = Thread.currentThread();
// 获取同步状态值
int c = getState();
// 获取写锁的计数
int w = exclusiveCount(c);
// 如果不为0,说明存在锁,但是不知是何种锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 1.写锁计数为0(w==0)。说明此时只有读锁,不能将读锁升级为写锁,所以直接返回false。
// 2.写锁计数不为0,但不是当前线程持有的写锁(current != getExclusiveOwnerThread())。直接返回false。
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 计算写锁的数量,如果溢出了,就抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 执行到这里,说明肯定是当前线程持有的写锁,那么此时没有线程竞争,更新写锁计数
setState(c + acquires);
return true;
}
// 说明此时c为0,无锁状态中。
// writerShouldBlock在公平模式下,判断是否存在队列且在其他结点之后,如果有前驱结点,则返回true,阻塞获取写锁。非公平模式下,直接不阻塞,尝试进行写锁的CAS操作失败。则返回false;在非公平模式下,不被阻塞。
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置当前占有者
setExclusiveOwnerThread(current);
return true;
}
tryRelease(int releases)
protected final boolean tryRelease(int releases) {
// 如果当前线程不是锁的占有者,直接异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计算同步状态的值
int nextc = getState() - releases;
// 判断写锁是否已经全部释放
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 写锁完全释放需要情况锁的占用者
setExclusiveOwnerThread(null);
// 更新state
setState(nextc);
return free;
}
读锁的获取与释放
tryAcquireShared(int unused)
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough: 演练
* 1. If write lock held by another thread, fail. 如果写锁被另一个线程持有,则失败。
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 否则,这个线程有资格获得锁wrt状态,所以询问它是否应该因为队列策略而阻塞。如果没有,尝试通过CAS状态和更新计数授予。注意,step没有检查可重入获取,这被推迟到完整版本,以避免在更典型的不可重入情况下必须检查hold count。
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
* 如果由于线程显然不符合条件或 CAS 失败或计数已饱和,步骤 2 失败,则链接到具有完整重试循环的版本。
*/
// 获取当前线程
Thread current = Thread.currentThread();
// 获取同步状态值
int c = getState();
// 如果有写锁且当前线程并不是拥有写锁的线程,获取失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获得读锁的计数
int r = sharedCount(c);
// readerShouldBlock在公平模式下,判断是否存在队列且在其他结点之后,如果有前驱结点,则返回true,阻塞获取读锁。非公平模式下,如果其他线程获取了写锁,则返回true 阻塞当前线程获取读锁。读锁的计数在合理返回内,且CAS操作更新同步状态
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 此时,当前线程已经获得了读锁
// 如果成立,说明当前线程是第一个获取到读锁的线程,设置一下firstReader和firstReaderHoldCount。
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 当前线程就是第一个获得该读锁的线程,则firstReaderHoldCount增加1
firstReaderHoldCount++;
} else {
// 获得的HoldCounter指不定是哪个线程的HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// 如果为null说明从来没有设置过AQS的cachedHoldCounter
//如果不为null但线程id不是当前的,说明重新设置
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
//如果cachedHoldCounter的线程id就是当前线程id,且count为0
readHolds.set(rh);
//不管怎样,局部变量rh的count都要加1
rh.count++;
}
return 1;
}
// 如果获取读锁被阻塞 或CAS操作失败 或者当前读锁的计数已经最大,则需要进行全面尝试获取锁
return fullTryAcquireShared(current);
}
fullTryAcquireShared(Thread current)
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
* 此代码与 tryAcquireShared 中的代码部分冗余,但总体上更简单,因为不会使 tryAcquireShared 与重试和延迟读取保持计数之间的交互复杂化。
*/
//只要rh不为null,那么它一定指向当前线程的HoldCounter对象
HoldCounter rh = null;
// 无线循环--尝试获取读锁
for (;;) {
int c = getState();
// 若存在写锁且拥有写锁的线程不是当前线程,则直接失败。如果写锁是当前线程那就继续执行
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// 第一个获取读锁的是当前线程,且尚未完全释放读锁
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();//当前线程如果没有获得读锁,get到的count值为0
if (rh.count == 0)
//当前线程没有获得读锁时,本来它的HoldCounter成员本来就应该为null,所以要remove
readHolds.remove();
}
}
//rh局部变量还保留着当前线程的HoldCounter成员的引用
if (rh.count == 0)
return -1;
}
}
// 读锁计数的验证
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS操作获取读锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
tryReleaseShared(int unused)
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 如果当前线程是第一个获取读锁的线程,则进行读锁的部分释放与完全释放的选择
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 获取当前线程的HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
// 无锁需要释放了,直接异常
if (count <= 0)
throw unmatchedUnlockException();
}
//执行到这里,说明当前线程持有读锁中,那么减小读锁计数1
--rh.count;
}
//无限循环-CAS修改同步状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
//只有在读写锁都是干净的情况,才返回true
return nextc == 0;
}
}
锁的降级
如果当前线程不获取读锁直接释放写锁,在其使用数据期间,就无法感知其他线程获取了写锁并修改了之后的数据,在对数据比较敏感的场景下,为了保证数据的可见性,在当前线程使用完数据之后,写锁才能被其他线程获取,这样需要一个机制来满足这个需求:锁降级。
锁降级指当前线程同时持有读锁和写锁后,先释放了写锁,使得写锁降级为了读锁。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LockDegrade {
private int i=0;
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
public void processData() {
writeLock.lock();
try {
i++;
readLock.lock();
} finally {
writeLock.unlock();
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
if (i == 1) {
System.out.println("获取锁的当前线程[" + Thread.currentThread().getName() + "] i的值是" + i);
} else {
System.out.println("获取锁的当前线程[" + Thread.currentThread().getName() + "] i=" + i);
}
} finally {
readLock.unlock();
}
}
public static void main(String[] args) {
LockDegrade demo = new LockDegrade();
for (int x = 0; x < 5; x++) {
new Thread(() -> {
demo.processData();
}).start();
}
}
}
锁降级结果
获取锁的当前线程[Thread-1] i的值是1
获取锁的当前线程[Thread-0] i=2
获取锁的当前线程[Thread-2] i=3
获取锁的当前线程[Thread-3] i=4
获取锁的当前线程[Thread-4] i=5
如果将17行和34行注释掉,执行结果
获取锁的当前线程[Thread-2] i=5
获取锁的当前线程[Thread-0] i=5
获取锁的当前线程[Thread-1] i=5
获取锁的当前线程[Thread-3] i=5
获取锁的当前线程[Thread-4] i=5
- 为了有了ReentrantLock还需要ReentrantReadWriteLock?
- ReentrantReadWriteLock底层实现原理?
- ReentrantReadWriteLock底层读写状态如何设计的? 高16位为读锁,低16位为写锁
- 读锁和写锁的最大数量是多少?
- 本地线程计数器ThreadLocalHoldCounter是用来做什么的?
- 缓存计数器HoldCounter是用来做什么的?
- 写锁的获取与释放是怎么实现的?
- 读锁的获取与释放是怎么实现的?
- RentrantReadWriteLock为什么不支持锁升级?
- 什么是锁的升降级? RentrantReadWriteLock为什么不支持锁升级?