0
点赞
收藏
分享

微信扫一扫

AQS源码解读(七)——ReentrantReadWriteLock原理详解(读写锁是一把锁吗?如何一把锁两个状态?)



天青色等烟雨,而我在等你,微信公众号搜索:徐同学呀,持续更新肝货,快来关注我,和我一起学习吧~


更多JUC源码解读系列文章请持续关注​​JUC源码解读文章目录JDK8​​ !


文章目录


  • ​​一、前言​​
  • ​​二、ReentrantReadWriteLock基本结构​​
  • ​​三、ReentrantReadWriteLock.Sync是一把锁还是两把锁?​​
  • ​​四、锁的公平性​​

  • ​​1、NonfairSync​​
  • ​​2、FairSync​​

  • ​​五、读锁的获取与释放​​

  • ​​1、ReadLock#lock​​

  • ​​(1)tryAcquireShared获取共享锁​​
  • ​​(2)fullTryAcquireShared自旋获取共享锁​​
  • ​​(3)doAcquireShared进入同步队列操作​​
  • ​​(4)setHeadAndPropagate传播唤醒后继共享节点​​

  • ​​2、ReadLock#lockInterruptibly​​
  • ​​3、ReadLock#tryLock​​
  • ​​4、ReadLock#unlock​​
  • ​​ReentrantReadWriteLock.Sync#tryReleaseShared​​

  • ​​六、写锁的获取与释放​​

  • ​​1、WriteLock#lock​​
  • ​​ReentrantReadWriteLock.Sync#tryAcquire​​
  • ​​2、WriteLock#lockInterruptibly​​
  • ​​3、WriteLock#tryLock​​
  • ​​4、WriteLock#unlock​​

  • ​​七、总结​​


一、前言

​ReentrantReadWriteLock​​​作为读写锁,整体架构和实现都与​​ReentrantLock​​​类似,不同之处在于​​ReentrantReadWriteLock​​​分为读锁和写锁,读锁是共享锁,可多个读线程共享一把锁;写锁是互斥锁,只能有一个线程持有锁。同样​​ReentrantReadWriteLock​​也是基于AQS实现的。

二、ReentrantReadWriteLock基本结构

AQS源码解读(七)——ReentrantReadWriteLock原理详解(读写锁是一把锁吗?如何一把锁两个状态?)_ReadWriteLock

​ReentrantReadWriteLock​​​实现了接口​​ReadWriteLock​​​,​​ReadWriteLock​​内部又由两个Lock接口组成。

public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}

​ReentrantReadWriteLock​​​中有两个内部类​​ReentrantReadWriteLock.ReadLock​​​、​​ReentrantReadWriteLock.WriteLock​​​都实现了接口​​Lock​​​,分别实现了读锁和写锁的逻辑。而真正锁机制的实现逻辑都在内部类​​ReentrantReadWriteLock.Sync​​中。

public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
}

三、ReentrantReadWriteLock.Sync是一把锁还是两把锁?

​ReentrantReadWriteLock.Sync​​​继承自​​AbstractQueuedSynchronizer​​​,虽然​​ReentrantReadWriteLock​​分为

​ReadLock​​​和​​WriteLock​​​,却共享一个同步队列控制器​​Sync​​​,表面看是两把锁,实际上是一把锁,线程分为读线程和写线程,读读不互斥,读写互斥,写写互斥。既然共享一个​​Sync​​​,那就是共享一个​​state​​。源码巧妙的用一个变量state表示两种锁的状态:


  • 低16位记录写锁,高16位记录读锁。
  • 当​​state=0​​时,读线程和写线程都不持有锁。
  • 当​​state!=0​​​,​​sharedCount(c)!=0​​时表示读线程持有锁,返回值为持有锁的读线程数。
  • 当​​state!=0​​​,​​exclusiveCount(c)!=0​​时表示写线程持有锁,返回值写锁重入次数。

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/**
* 低16位写锁,高16位读锁
*/
static final int SHARED_SHIFT = 16;
// 每次获取读锁,高位就+1,相当于+SHARED_UNIT
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 1111 1111 1111 1111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
//共享锁(读锁)持有锁的读线程数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
//独占锁(写锁)重入的次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}

  • ​MAX_COUNT​​即是写锁的重入最大次数的界限,也是持有读锁的读线程的最大数量的界限。
  • ​EXCLUSIVE_MASK​​​作为写锁重入次数的掩码,但是感觉duck不必,写锁状态位本身是低位的,(​​exclusiveCount​​​)再与​​EXCLUSIVE_MASK​​​作​​&​​​运算得到写锁重入次数,不还是原值​​c​​吗?
  • 强调一点,写线程获取锁修改​​state​​​,就是正常的+1,可以理解为低16位+1;而读线程获取锁修改​​state​​​,是高16位+1,即为每次​​state+=SHARED_UNIT​​​,​​SHARED_UNIT​​​是一个很大数,每次读锁​​state​​加这么大个数,怎么是+1呢,这里就要理解高16位+1是在二进制下+1:

1000 0000 0000 0000
+
1000 0000 0000 0000
=
1 0000 0000 0000 0000
  • 如此​​sharedCount​​中c=state向右移16位就得到有多少个读线程持有锁了。

四、锁的公平性

​ReadLock​​​和​​WriteLock​​也区分公平锁和非公平锁,默认情况是非公平锁。

public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

​ReentrantReadWriteLock​​​中的​​NonfairSync​​​和​​FairSync​​​也都继承自​​ReentrantReadWriteLock#Sync​​​,但是并没有像​​ReentrantLock​​​中一样,分别实现获取锁的逻辑,而是分别实现了两种阻塞的策略,​​writerShouldBlock​​​和​​readerShouldBlock​​​。获取锁模板方法已经在​​ReentrantReadWriteLock#Sync​​中实现了。

1、NonfairSync


  • ​NonfairSync#writerShouldBlock​​ :写线程在抢锁之前永远不会阻塞,非公平性。
  • ​NonfairSync#readerShouldBlock​​:读线程抢锁之前,如果队列head后继(head.next)是独占节点时阻塞。

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
//写线程在抢锁之前永远不会阻塞-非公平锁
return false; // writers can always barge
}
final boolean readerShouldBlock() {
* 读线程抢锁的时候,如果队列第一个是实质性节点(head.next)是独占节点时阻塞
* 返回true是阻塞
*/
return apparentlyFirstQueuedIsExclusive();
}
}
/**
* 判断qas队列的第一个元素是否是独占线程(写线程)
* @return
*/
//AbstractQueuedSynchronizer
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
final boolean isShared() {
return nextWaiter == SHARED;
}

2、FairSync

在​​FairSync​​,无论是写线程还是读线程,只要同步队列中有其他节点在等待锁,就阻塞,这就是公平性。

static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;

/**
* 同步队列中head后继(head.next)不是当前线程时阻塞,
* 即同步队列中有其他节点在等待锁,此时当前写线程阻塞
* @return
*/
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
/**
* 同步队列中排在第一个实质性节点(head.next)不是当前线程时阻塞,
* 即同步队列中有其他节点在等待锁,此时当前读线程阻塞
* @return
*/
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
//同步队列中head后继(head.next)是不是当前线程
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

五、读锁的获取与释放

​ReadLock​​​和​​WriteLock​​​分别实现了Lock的lock和unlock等方法,实际上都是调用的​​ReentrantReadWriteLock.Sync​​的中已经实现好的模板方法。

1、ReadLock#lock

读锁是共享锁,首先尝试获取锁​​tryAcquireShared​​​,获取锁失败则进入同步队列操作​​doAcquireShared​​。

//ReentrantReadWriteLock.ReadLock#lock
public void lock() {
sync.acquireShared(1);
}
//AbstractQueuedSynchronizer#acquireShared
public final void acquireShared(int arg) {
//tryAcquireShared 返回-1 获取锁失败,1获取锁成功
if (tryAcquireShared(arg) < 0)
//获取锁失败入同步队列
doAcquireShared(arg);
}

(1)tryAcquireShared获取共享锁

​tryAcquireShared​​​在​​ReentrantReadWriteLock#Sync​​中实现的。如下是获取共享锁的基本流程:


  1. 判断state,是否有线程持有写锁,若有且持有锁的不是当前线程,则返回-1,获取锁失败。(读写互斥)
  2. 若持有写锁的是当前线程,或者没有线程持有写锁,接下来判断读线程是否应该阻塞​​readerShouldBlock()​​。
  3. ​readerShouldBlock()​​区分公平性,非公平锁,队列head后继(head.next)是独占节点,则阻塞;公平锁,队列中有其他节点在等待锁,则阻塞。
  4. 读线程不阻塞且加锁次数不超过​​MAX_COUNT​​​且​​CAS​​​拿读锁成功​​c + SHARED_UNIT​​。
  5. 若​​r = sharedCount(c)=0​​​说明没有线程持有读锁,此时设置​​firstReader​​​为当前线程,第一个读线程重入次数​​firstReaderHoldCount​​为1。
  6. 若​​r = sharedCount(c)!=0​​​说明有线程持有读锁,此时当前线程是​​firstReader​​​,则​​firstReaderHoldCount​​+1。
  7. 若持有当前读锁的不是​​firstReader​​​,则​​HoldCounter​​来记录各个线程的读锁重入次数。
  8. 若因为​​CAS​​​获取读锁失败,会进行自旋获取读锁​​fullTryAcquireShared(current)​​。

//ReentrantReadWriteLock.Sync#tryAcquireShared
protected final int tryAcquireShared(int unused) {

Thread current = Thread.currentThread();
int c = getState();
//exclusiveCount(c) != 0 写锁被某线程持有,当前持有锁的线程不是当前线程,直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//写锁没有线程持有或者当前持有写锁的写线程可以继续拿读锁
int r = sharedCount(c);
//nonFairSync 队列第一个是写线程时,读阻塞,!false && 加锁次数不超过max && CAS拿读锁,高16位+1
//FairSync 当队列的第一个不是当前线程时,阻塞。。。
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) { //r==0说明是第一个拿到读锁的读线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { //第一个持有读锁的线程时当前线程,为重入
firstReaderHoldCount++;
} else {
//HoldCounter 记录线程重入锁的次数
//读锁 可以多个读线程持有,所以会记录持有读锁的所有读线程和分别重入次数
HoldCounter rh = cachedHoldCounter;
//rh==null 从readHolds获取
//rh != null rh.tid != 当前线程
if (rh == null || rh.tid != getThreadId(current))
//取出当前线程的cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//compareAndSetState(c, c + SHARED_UNIT) 失败后 自旋尝试获取锁
return fullTryAcquireShared(current);
}

(2)fullTryAcquireShared自旋获取共享锁

自旋获取锁的过程与​​tryAcquireShared​​类似,获取读锁,记录重入,只不过加了一个循环,循环结束的条件是获取锁成功(1)或者不满足获取锁的条件(-1)。

final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
//写锁有线程持有,但是持锁的线程不是当前线程,返回-1,结束自旋。
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
//读线程应该被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
//删除不持有该读锁的cachedHoldCounter
readHolds.remove();
}
}
if (rh.count == 0)
//当前线程不持有锁,直接返回-1
return -1;
}
}
//读线程不应该阻塞,判断state
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//获取读锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
//下面就是记录重入的机制了
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

(3)doAcquireShared进入同步队列操作

​tryAcquireShared(arg)​​返回-1,获取锁失败,则进入同步队列操作:


  1. 创建一个共享节点,并拼接到同步队列尾部。
  2. 获取新节点的前继节点,若是​​head​​,则尝试获取锁。
  3. 获取锁成功,唤醒后继共享节点并出队列。
  4. node的前继节点不是head,或者获取锁失败,判断是否应该阻塞(​​shouldParkAfterFailedAcquire​​​),应该阻塞​​parkAndCheckInterrupt​​阻塞当前线程。

private void doAcquireShared(int arg) {
//创建一个读节点,并入队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//如果前继节点是head,则尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取锁成功,node出队列,
//唤醒其后继共享节点的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
/**
* p不是头结点 or 获取锁失败,判断是否应该被阻塞
* 前继节点的ws = SIGNAL 时应该被阻塞
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

这里的​​setHeadAndPropagate()​​​是在获取共享锁成功的情况下调用的,所以​​propagate​​​>0,(​​tryAcquireShared​​​在​​Semaphore​​​中有返回0的情况,返回结果为资源剩余量)。若node的下一个节点是共享节点,则调用​​doReleaseShared()​​唤醒后继节点。

(4)setHeadAndPropagate传播唤醒后继共享节点


  • 首先获取锁的node节点赋值给head,成为新head。
  • 在​​ReetrantReadWriteLock​​中node获取锁成功只有可能是​​propagate > 0​​,所以后面新旧head判断会省略,可以暂时不用考虑。
  • 若node后面没有节点(调用​​doReleaseShared​​没多大意义),或者node后面有节点且是共享节点则会调用​​doReleaseShared()​​唤醒后继节点。

(共享锁的传播性,详解请移步​​《AQS源码解读(六)——从PROPAGATE和setHeadAndPropagate()分析共享锁的传播性》​​。)

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// propagate > 0 获取锁成功
// propagate < 0 获取锁失败,队列不为空,h.waitStatus < 0
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//唤醒后继共享节点
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

2、ReadLock#lockInterruptibly

可中断获取锁,顾名思义就是获取锁的过程可响应中断。​​ReadLock#lockInterruptibly​​​在获取锁的过程中有被中断(​​Thread.interrupted()​​​),则会抛出异常​​InterruptedException​​​,终止操作;其直接调用了AQS的模板方法​​acquireSharedInterruptibly​​。

(​​acquireSharedInterruptibly​​​和​​doAcquireSharedInterruptibly​​​详解请移步​​《AQS源码解读(五)——从acquireShared探索共享锁实现原理,何为共享?如何共享?》​​)

//ReentrantReadWriteLock.ReadLock#lockInterruptibly
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
//被打断 抛出异常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
//获取锁失败,进入队列操作
doAcquireSharedInterruptibly(arg);
}

3、ReadLock#tryLock

​ReadLock#tryLoc​​​尝试获取锁,调用的是​​ReentrantReadWriteLock.Sync​​​实现的​​tryReadLock​​​,获取锁成功返回true,失败返回false,不会进入队列操作,所以也不区分公平性。代码结构上和​​ReentrantReadWriteLock.Sync#tryAcquireShared​​​相似,所以不多赘述,不同之处是​​ReadLock#tryLoc​​本身就是自旋获取锁。

public boolean tryLock() {
return sync.tryReadLock();
}
//ReentrantReadWriteLock.Sync#tryReadLock
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
//判断是否有线程持有写锁
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//没有线程持有写锁or持有写锁的是当前线程,写锁-->读锁 锁降级
if (compareAndSetState(c, c + SHARED_UNIT)) {
//获取读锁成功
if (r == 0) {
//第一个读锁的线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//不是第一次获取读锁 但是firstReader是当前线程,重入
firstReaderHoldCount++;
} else {
//其他线程获取读锁,重入操作
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}

同样和​​ReentrantLock​​​一样,​​ReadLock#tryLock​​​也有一个重载方法,可传入一个超时时长​​timeout​​​和一个时间单位​​TimeUnit​​,超时时长会被转为纳秒级。

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//AbstractQueuedSynchronizer#tryAcquireSharedNanos
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}

​tryLock(long timeout, TimeUnit unit)​​​直接调用了AQS的模板方法​​tryAcquireSharedNanos​​,也具备了响应中断,超时获取锁的功能:


  1. 若一开始获取锁​​tryAcquireShared​​​失败则进入AQS同步队列​​doAcquireSharedNanos​​。
  2. 进入同步队列后自旋1000纳秒,还没有获取锁且判断应该阻塞,则会阻塞一定时长。
  3. 超时时长到线程自动唤醒,再自旋还没获取锁,且判断超时则返回false。
  4. 自旋判断前驱是head,则尝试获取锁,获取成功,则出队,传播唤醒后继。

(​​tryAcquireSharedNanos​​​详解请看拙作​​《AQS源码解读(五)——从acquireShared探索共享锁实现原理,何为共享?如何共享?》​​)

4、ReadLock#unlock

读锁释放很简单,释放共享唤醒后继,无需区分公平性;其直接调用的是AQS的​​releaseShared​​​,​​ReentrantReadWriteLock​​​只需要实现​​tryReleaseShared​​即可。

public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//读锁释放唤醒后继节点
doReleaseShared();
return true;
}
return false;
}

ReentrantReadWriteLock.Sync#tryReleaseShared

​ReentrantReadWriteLock​​​中实现的​​tryReleaseShared​​​需要全部释放锁,才会返回true,才会调用​​doReleaseShared​​唤醒后继。

//ReentrantReadWriteLock.Sync#tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
/**
* 持有读锁的第一个线程是当前线程,且重入次数为1,释放锁将firstReader=null
* 否则 firstReaderHoldCount-1
*/
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
//读写都空闲了 才唤醒后面
return nextc == 0;
}
}

六、写锁的获取与释放

1、WriteLock#lock

写锁的lock和​​ReentrantLock​​​的lock逻辑类似都是调用​​AbstractQueuedSynchronizer#acquire​​​,区别在于​​tryAcquire​​的实现。

public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
//若没有抢到锁,则进入等待队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//自己中断自己
selfInterrupt();
}

ReentrantReadWriteLock.Sync#tryAcquire

​ReentrantLock​​​中​​tryAcquire​​​是在​​NonfairSync​​​和​​FairSync​​​中实现的,​​ReetrantReadWriteLock​​​是在​​Sync​​中实现的。

​ReetrantReadWriteLock​​中有读写锁,所以要考虑读写互斥的情况,即读锁被持有,将直接返回false,获取锁失败,如下是基本流程:


  1. ​c = getState() != 0​​,说明有线程持有读锁或者写锁。
  2. 继续判断​​w = exclusiveCount(c) = 0​​,则说明有线程持有读锁,直接返回false,获取锁失败。
  3. ​w = exclusiveCount(c) != 0​​,说明有线程持有写锁,判断持有锁的线程是否是当前线程,是就重入。
  4. 若​​c = getState() = 0​​​,没有线程持有锁,判断​​writerShouldBlock​​​,写线程是否应该阻塞,​​NonFairSync​​​中 写线程无论如何都不应该阻塞,则继续抢锁;​​FairSync​​中的只要同步队列中有其他线程在排队,就应该阻塞。
  5. 最后若获取锁成功会设置持锁的线程为当前线程。

//ReentrantReadWriteLock.Sync#tryAcquire
protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { //c!=0 说明有读线程或者写线程持有锁
// (Note: if c != 0 and w == 0 then shared count != 0)
//w == 0 说明锁被读线程持有,w==0直接返回,抢锁失败,
//w != 0 判断当前线程是否持有锁,不是直接返回false,抢锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//w!=0 && current == getExclusiveOwnerThread 当前线程重入
//首先判断重入次数是否超过最大次数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
//没有线程持有锁,写线程是否应该被阻塞,
// FairSync中的是只要线程中有其他线程在排队,就阻塞
// NonFairSync 中 写线程抢锁无论如何都不阻塞,直接抢
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置持锁的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}

若​​tryAcquire​​​获取锁失败,则进入队列操作,此过程和​​ReentrantLock​​类似。

2、WriteLock#lockInterruptibly

​WriteLock#lockInterruptibly​​​可中断获取锁,其实现和​​ReentrantLock#lockInterruptibly​​​代码是一样的,都是调用​​AbstractQueuedSynchronize​​​中已经实现的模板方法​​acquireInterruptibly​​​和​​doAcquireInterruptibly​​​,​​tryAcquire​​​就是​​ReentrantReadWriteLock.Sync#tryAcquire​​。

public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
//有被中断 抛异常
throw new InterruptedException();
if (!tryAcquire(arg))
//doAcquireInterruptibly也会响应中断,抛异常
doAcquireInterruptibly(arg);
}

3、WriteLock#tryLock

​WriteLock#tryLock​​,尝试获取锁,获取锁成功就返回true,失败返回false,不会进入队列操作,所以不需要考虑公平性。

public boolean tryLock( ) {
return sync.tryWriteLock();
}
//ReentrantReadWriteLock.Sync#tryWriteLock
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
//有线程还有锁
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
//w=0,读锁被持有,直接返回false
//w!=0 写锁持有,但不是当前线程还有
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
//没有线程持有锁,或者持有写锁的是当前线程,继续获取锁
if (!compareAndSetState(c, c + 1))
return false;
//设置当前线程为持有线程
setExclusiveOwnerThread(current);
return true;
}

而其重载方法​​tryLock(long timeout, TimeUnit unit)​​​,代码和​​ReentrantLock​​​基本一样,就​​tryAcquire​​获取锁的逻辑不一样,这里不再过多赘述。

​tryLock(long timeout, TimeUnit unit)​​​直接调用了AQS的模板方法​​tryAcquireNanos​​,也具备了响应中断,超时获取锁的功能:


  1. 若一开始获取锁(​​ReentrantReadWriteLock.Sync#tryAcquire​​​)失败则进入AQS同步队列​​doAcquireNanos​​。
  2. 进入同步队列后自旋1000纳秒,还没有获取锁且判断应该阻塞,则会阻塞一定时长。
  3. 超时时长到线程自动唤醒,再自旋还没获取锁,且判断超时则返回false。

(​​tryAcquireNanos​​​详解请看拙作​​《AQS源码解读——从acquireQueued探索独占锁实现原理,如何阻塞?如何唤醒?》​​)

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

4、WriteLock#unlock

写锁的释放和​​ReetrantLock​​中的锁释放代码逻辑一样,释放锁成功唤醒后继节点。

public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//释放锁成功后唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}

七、总结


  • ​ReentrantReadWriteLock​​读写锁基于AQS实现,读锁是共享锁,写锁是互斥锁。
  • ​ReentrantReadWriteLock​​​中的​​NonfairSync​​​和​​FairSync​​​分别实现了两种阻塞的策略,​​writerShouldBlock​​​和​​readerShouldBlock​​。
  • ​ReentrantReadWriteLock​​​中巧妙的用一个​​state​​变量记录两种锁的状态,低16位记录写锁,高16位记录读锁。
  • ​sharedCount(int c)==0​​​判断线程是否持有读锁,​​exclusiveCount(int c)==0​​判断线程是否持有写锁。
  • 读锁无法通过state记录锁重入,需要一个工具(​​HoldCounter​​​、​​ThreadLocalHoldCounter​​)专门记录持有读锁的各个线程的重入情况。
  • 在​​ReadLock#lock​​中获取读锁时,一个线程持有写锁时还可以再获得读锁,称为锁降级,但是没有锁升级。
  • 读写锁都是悲观锁,在读多写少的情况下,可能会出现写线程“饿死”的情况,即写线程一直获取不到锁。

PS: ​如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!



举报

相关推荐

七夕一把火 抢先版

0 条评论