ReentrantReadWriteLock是Java中读写锁的一种实现,他可以让多个线程同时读取共享资源——读共享,写入的时候只允许一个线程独占——写独占.了解过ReentrantLock的应该知道这个是一个可重入锁,允许一个线程可以多次获取相同类型的锁.在读操作多于写操作的场景下,读写锁能够提高读操作的并发性,同时保证写操作的独占性.
读锁的使用
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReadLock readLock = reentrantReadWriteLock.readLock();
readLock.lock();
new ReentrantReadWriteLock()的时候源码
public ReentrantReadWriteLock(boolean fair) {
// 根据传进来的参数,来选择是否为公平锁
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
继续往下看,公平锁/公平锁都会继承Sync对象,这个对象的构造方法
Sync() {
//
readHolds = new ThreadLocalHoldCounter();
// 这里有一个关于volatile的知识点:
// 为什么说ensures visibility of readHolds?
// 首先获取当前state的值,然后再将获取的state值赋值给state
// state是被volatile修饰的,对于一个volatile变量的写操作,会保证其前面对普通变量的写操作对volatile写操作是可以见的.
// 即:当第二个操作是volatile写时,无论第一个操作是什么,都不会发生指令重排
setState(getState()); // ensures visibility of readHolds
}
先看下ThreadLocalHoldCounter类
// ThreadLocal 的子类,其实就是一个ThreadLoca
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
// 重写了ThreadLocal的initialValue方法
public HoldCounter initialValue() {
return new HoldCounter();
}
}
再来看下HoldCounter类
static final class HoldCounter {
// 持有的读锁数
int count = 0;
// 往里面看可以看到这里获取线程id并不是简单的使用Thread.getId()方法,
// 而是 UNSAFE.getLongVolatile(thread, TID_OFFSET);
// 根据注释可以知道,Thread.getId()不是被final修饰的,
// 子类是可以重写该方法,这样就会导致映射不唯一.
final long tid = getThreadId(Thread.currentThread());
}
下面来看看读锁的源码
public void lock() {
// sync是new ReentrantReadWriteLock()的时候进行初始化的.
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
// 尝试获取锁,当返回大于0的时候,代表获取锁成功
if (tryAcquireShared(arg) < 0)
// 获取锁失败,入队
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 获取state
int c = getState();
// exclusiveCount(c):
// 方法是Sync中的静态方法,用来判断是否有写锁(独占锁),后面会具体分析
if (exclusiveCount(c) != 0 &&
// exclusiveOwnerThread是AbstractOwnableSynchronizer的属性,
// 记录了当前获取写锁的线程
getExclusiveOwnerThread() != current)
// 来到这里说明:a.有独占写锁,
// b.持用写锁的并不是当前线程
// 无法获取读锁,需要去排队
// 否则,如果有独占锁,并且持有写锁的是当前线程,就可以获取写锁,即锁降级
return -1;
// sharedCount(c):
// 和exclusiveCount(c)一样,是Sync的静态方法,用来判断读锁的状态
int r = sharedCount(c);
// readerShouldBlock():判断是否需要进入到阻塞队列,后面会详细分析
if (!readerShouldBlock() &&
r < MAX_COUNT &&
// SHARED_UNIT: 1左移16位,相当于c的高16位+1,后面会讲为什么会左移16位
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// r = 0 说明此线程是第一个加读锁的线程
// firstReader: Sync的属性,用来记录当前第一个获取读锁的线程
// firstReaderHoldCount: Sync的属性,用来记录当前第一个获取读锁的线程获取锁的次数
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// firstReader == current:说明当前线程为第一个获取读锁的线程,直接count直接+1就行
firstReaderHoldCount++;
} else {
// 来到这里说明:当前线程并不是firstReader记录的线程
// cachedHoldCounter: Sync的属性,用来缓存最后一个获取读锁的信息的
// HoldCounter:已经在上面讲过了,用来管理线程和与之对应的读锁的重入次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// 设置cachedHoldCounter缓存,通过ThreadLocal获取
// readHolds:其实就是ThreadLocal,上面讲过在Sync的构造方法执行的时候,进行初始化的.
// 这里有get,那么就会有对应的set,后面会讲
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 来到这里说明:cachedHoldCounter里边缓存了当前线程获取锁的记录,
// 但是可能是通过初始化readHolds.get()初始化获取的,因此count = 0
readHolds.set(rh);
// 给当前线程的读锁的重入次数+1
rh.count++;
}
// 获取读锁成功
return 1;
}
// 1.readerShouldBlock()方法之后需要进去到阻塞队列
// 2.r超出了最大值
// 3.竞争锁失败(cas失败)
return fullTryAcquireShared(current);
}
在分析fullTryAcquireShared(current);之前需要对上面的部分代码进行解释.
在解释之前,需要学习下ReentrantReadWriteLock的实现原理
ReentrantReadWriteLock是读写可重入锁,相比ReentrantLock来说,都是通过state来实现锁的状态,大家都知道ReentrantLock的state=0代表可加锁,state>0,state等于几就代表重入的次数.
要想通过state一个属性来表示是否可以加锁和重入次数,以及是读锁还是写锁该如何实现呢?
ReentrantReadWriteLock里边,将 state 这个 32 位的 int 值分为高 16 位和低 16位,分别用于共享模式和独占模式。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int exclusiveCount(int c) {
// 取c的低16位
return c & EXCLUSIVE_MASK;
}
static int sharedCount(int c) {
// 取c的高16位
return c >>> SHARED_SHIFT;
}
readerShouldBlock()方法分为来公平方式和非公平方式.
FairSync:
final boolean readerShouldBlock() {
// 看队列中是否有在排队的,false——>不需要排队,ture——>需要排队
return hasQueuedPredecessors();
}
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
NonfairSync:
final boolean readerShouldBlock() {
// 按道理非公平锁是不需要排队的,这里有一个优化,防止写锁饥饿等待
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
// 如果头节点的下一个节点不是共享模式的节点(即是写操作),
// 则即使当前有读锁,也需要进去等待队列进行排队
!s.isShared() &&
s.thread != null;
}
下面来看最后一个方法:
fullTryAcquireShared(current),再次尝试获取锁.
上面列举了会执行该方法的三种情况:
1.需要进去到阻塞队列
2.r超出了最大值
3.竞争锁失败(cas失败)
(1)对于第一种情况,如果是因为在NonfairSync情况下,虽然head.next是获取写锁的,并且它等待了很久,我可以不和他抢,但是我是重入读锁,那就只能对不起了.
(2)对于第三种情况,因为cas失败,如果就此返回,就要进入到阻塞队列里了,会有些不甘心,因为已经满足了第一种情况了,所以进入到这个方法其实是增加CAS成功的机会.
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
// 自璇
for (;;) {
// 获取当前的state
int c = getState();
// 再来判断一次,可能由其他情况进入到这个方法中的.
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// 就是上面说的第一种情况,需要判断是否是重入的
if (firstReader == current) {
// firstReader记录的是当前线程,满足重入条件
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
// 防止是因为上面readHolds.get();初始化出来的
readHolds.remove();
}
}
if (rh.count == 0)
// count == 0,显然不符合重入条件
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 处理重入
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}