0
点赞
收藏
分享

微信扫一扫

AQS源码解读(五)——从acquireShared探索共享锁实现原理,何为共享?如何共享?



天青色等烟雨,而我在等你,微信公众号搜索:​徐同学呀​,持续更新肝货,快来关注我,和我一起学习吧~


更多JUC源码解读系列文章请持续关注​​JUC源码解读文章目录JDK8​​!


文章目录


  • ​​一、前言​​
  • ​​二、共享模式获取锁​​

  • ​​1、doAcquireShared入队列,自旋,阻塞​​

  • ​​(1)setHeadAndPropagate传播唤醒后继​​
  • ​​(2)判断是否应该阻塞和阻塞线程​​


  • ​​三、共享模式可中断获取锁​​
  • ​​doAcquireSharedInterruptibly入队列,自旋,阻塞,响应中断​​
  • ​​四、共享模式可超时获取锁​​
  • ​​doAcquireSharedNanos入队列,自旋,阻塞,自动唤醒,响应中断​​
  • ​​五、共享模式释放锁​​
  • ​​doReleaseShared唤醒后继​​
  • ​​六、总结​​


一、前言

何为共享锁?共享锁就是多个线程可以共享一把锁,如​​ReentrantReadWriteLock​​​的​​ReadLock​​​是共享锁,​​Semaphore​​​是共享锁,​​CountDownLatch​​​是共享锁,且这三个都是基于​​AQS​​实现的。

在AQS中共享锁和独占锁一样,也实现了一套通用的模板,子类只需要实现如何获取锁(​​tryAcquireShared​​​),如何释放锁(​​tryReleaseShared​​)的逻辑。

二、共享模式获取锁

共享模式获取锁的过程和独占锁非常类似,都是先获取锁,失败之后进入同步队列操作。​​tryAcquireShared​​​在AQS中没有给出具体实现,在子类​​ReentrantReadWriteLock.ReadLock​​​、​​Semaphore​​​和​​CountDownLatch​​中相应实现了获取共享锁的逻辑。

​tryAcquireShared​​​返回值小于0,表示获取共享锁失败,从而进入同步队列操作​​doAcquireShared​​。

public final void acquireShared(int arg) {
//tryAcquireShared 返回-1获取锁失败,返回值大于1或者0获取锁成功
if (tryAcquireShared(arg) < 0)
//获取锁失败,进入队列操作
doAcquireShared(arg);
}

1、doAcquireShared入队列,自旋,阻塞

​doAcquireShared​​​的逻辑和​​独占锁​​​​acquireQueued​​非常类似,入队列、自旋、阻塞等。基本流程如下:


  1. 新建共享节点node,并入队列(​​addWaiter(Node.SHARED)​​)。
  2. 自旋判断新节点node前驱是否是head,是则获取锁(​​tryAcquireShared​​​),获取锁成功并传播唤醒后继(​​setHeadAndPropagate​​)。
  3. 新节点node前驱不是head,则判断是否应该阻塞​​shouldParkAfterFailedAcquire​​。
  4. 判断应该阻塞,则阻塞当前线程​​parkAndCheckInterrupt​​。
  5. 被其他线程唤醒,或中断导致唤醒,则自旋重复2、3、4、5。

private void doAcquireShared(int arg) {
//创建一个读节点,并入队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//如果前继节点是head,则尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取锁成功,设置新head和共享传播(唤醒下一个共享节点)
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
/**
* p不是头结点 or 获取锁失败,判断是否应该被阻塞
* 前继节点的ws = SIGNAL 时应该被阻塞
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

(1)setHeadAndPropagate传播唤醒后继

自旋判断新节点node前驱是head,且尝试获取共享锁成功,则需要传播唤醒共享后继节点。基本流程如下:


  1. node出队列,并成为新的head。
  2. 判断是否满足传播条件。
  3. 如若node还有后继且后继是共享节点,则执行唤醒操作​​doReleaseShared​​。

(共享锁的传播模式比较复杂,如需深入了解,请阅读拙作​​《AQS源码解读(七)——从PROPAGATE和setHeadAndPropagate()分析共享锁的传播性》​​)

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//设置node为新head
setHead(node);
//一连串判断满足后,唤醒共享后继
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//唤醒后继共享节点
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

(2)判断是否应该阻塞和阻塞线程

共享锁判断判断是否应该阻塞和阻塞线程的代码和​​独占锁​​​的完全一样,不过多赘述。​​shouldParkAfterFailedAcquire​​主要做了如下几步:


  1. 判断node前驱状态是否为​​SIGNAL​​,是则直接返回true。
  2. node前驱状态不是​​SIGNAL​​​,有可能是​​ws>0​​,说明前驱取消了,自旋跳过取消的节点,并寻找链接一个正常的前驱。
  3. node前驱状态不是​​SIGNAL​​​,有可能是0(初始化状态)或​​PROPAGATE​​​(传播状态),修改node前驱状态为​​SIGNAL​​。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
* node拿锁失败,前继节点的状态是SIGNAL,node节点可以放心的阻塞,
* 因为下次会被唤醒
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* pred节点被取消了,跳过pred
*/
do {
//pred = pred.prev;
//node.pred = pred;
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//跳过取消节点,给node找一个正常的前驱,然后再循环一次
pred.next = node;
} else {
/* 0 -3
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

​shouldParkAfterFailedAcquire​​​判断返回true,则调用​​parkAndCheckInterrupt​​阻塞当前线程。

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

三、共享模式可中断获取锁

​acquireSharedInterruptibly​​​在代码结构上与​​doAcquireShared​​​类似,不同之处在于​​doAcquireShared​​​不响应中断,​​acquireSharedInterruptibly​​响应中断。

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
//被打断 抛出异常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
//获取锁失败
doAcquireSharedInterruptibly(arg);
}

doAcquireSharedInterruptibly入队列,自旋,阻塞,响应中断

​doAcquireSharedInterruptibly​​​逻辑上也和​​doAcquireShared​​​类似,但是​​doAcquireSharedInterruptibly​​​在线程阻塞期间,被中断导致唤醒,将抛出异常​​InterruptedException​​​,响应中断,此时​​failed=true​​​,​​finally​​​块中会执行取消节点的操作,取消过程和​​独占锁​​一样。

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//新建节点入队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//node节点的前驱节点是head,获取锁
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//在阻塞期间因为中断而唤醒,抛异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

四、共享模式可超时获取锁

和​​独占锁​​​的可超时获取锁类似,共享模式超时获取锁,不仅可以响应中断,还可以将线程阻塞一段时间,自动唤醒。​​tryAcquireSharedNanos​​​可传入一个纳秒单位的时间​​nanosTimeout​​​,可超时的逻辑在​​doAcquireSharedNanos​​中。

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}

doAcquireSharedNanos入队列,自旋,阻塞,自动唤醒,响应中断

​doAcquireSharedNanos​​​和​​doAcquireShared​​逻辑类似,但是不仅可以响应中断,同时还可以让线程阻塞一段时间自动唤醒,如果超时了还没获取锁则返回false。

​doAcquireSharedNanos​​​还有一个非常不同之处,就是即使​​shouldParkAfterFailedAcquire​​​判断应该阻塞了,也有可能不阻塞,还会再自旋一段时间,这个自旋的时长有一个阈值​​spinForTimeoutThreshold = 1000L​​,1000纳秒,自旋了1000纳秒后还没有获取锁,且此时也判断应该阻塞了,就让线程休眠一段时间。

线程唤醒,有可能是自动唤醒,有可能是被其他释放锁的线程唤醒,还有可能是线程中断导致唤醒,如果是线程中断唤醒,则需要响应中断,抛出异常​​InterruptedException​​;如果没有中断,则继续循环刚才的流程(判断前驱是否是head,判断是否超时,判断是否需要阻塞等)。

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
//判断是否应该阻塞
//在阈值spinForTimeoutThreshold范围自旋一定时长,
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//阻塞一定时间,时间到可自动唤醒
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
//线程如若发生中断,响应中断抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

五、共享模式释放锁

共享模式释放锁的逻辑很简单,​​tryReleaseShared​​​释放锁成功后,调用​​doReleaseShared​​​唤醒后继节点。​​tryReleaseShared​​需要子类自行实现。

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//读锁释放才唤醒后继节点
doReleaseShared();
return true;
}
return false;
}

doReleaseShared唤醒后继

共享锁释放和在AQS同步队列中的共享节点获取锁,传播唤醒时都调用了​​doReleaseShared​​​,其唤醒后继是一个自旋的过程(​​for(;;)​​),其基本流程如下:


  1. 获取head,判断队列是否为空。
  2. 不为空,则继续判断head的状态是否是​​SIGNAL​​,即是否可以唤醒后继。
  3. head的状态为​​SIGNAL​​​,则将状态置为0。head状态置为0成功,则唤醒后节点线程​​unparkSuccessor​​,失败则自旋重试。
  4. 若head状态为0,则将其置为​​PROPAGATE​​,使得head具有传播唤醒的特性。
  5. 如若自旋的过程中head变了,则继续自旋唤醒后继,head没有变,跳出自旋。

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//SIGNAL --> 0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继节点的线程
unparkSuccessor(h);
}
else if (ws == 0 &&
//0 --> PROPAGATE
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
/**
* we must loop in case a new node is added
* while we are doing this
*/
if (h == head) // loop if head changed
//head没有变则break
break;
}
}

六、总结


  • 共享锁的模板方法和​​独占锁​​代码结构上非常相似,判断是否应该阻塞,阻塞线程,取消节点代码通用。
  • 共享锁通用模板已经搭好,子类只需要实现如何获取锁(​​tryAcquireShared​​​),如何释放锁(​​tryReleaseShared​​)的逻辑。
  • 共享锁的共享体现在可以多个线程共享一把锁,通过传播模式,传播唤醒后继共享节点。传播性在共享锁获取和释放都有体现。

注意:共享锁的传播性,较为复杂,单独撰写一篇文章详细讲解,如需了解,请移步​​《AQS源码解读(六)——从PROPAGATE和setHeadAndPropagate()分析共享锁的传播性》​​。

因为共享锁的源码和独占锁的源码有重复部分,这里没有过多赘述,如需了解,请移步​​《AQS源码解读(二)——从acquireQueued探索独占锁实现原理,如何阻塞?如何唤醒?》​​

PS: ​如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!

AQS源码解读(五)——从acquireShared探索共享锁实现原理,何为共享?如何共享?_AQS



举报

相关推荐

0 条评论