0
点赞
收藏
分享

微信扫一扫

AQS源码解读(四)——Condition原理详解(Object#wait/notify优化?singnal唤醒线程了吗?)



天青色等烟雨,而我在等你,微信公众号搜索:​徐同学呀​,持续更新肝货,快来关注我,和我一起学习吧~


更多JUC源码解读系列文章请持续关注​​JUC源码解读文章目录JDK8​​!


文章目录


  • ​​一、前言​​
  • ​​二、代码示例​​
  • ​​三、Condition实现原理​​

  • ​​1、Condition单向队列​​
  • ​​2、ConditionObject#await​​


​​(1)fullyRelease完全释放锁​​​​(2)isOnSyncQueue判断节点是否在AQS同步队列​​​​(3)checkInterruptWhileWaiting判断唤醒过程中是否发生中断​​​​(4)acquireQueued重新获取锁​​​​(5)unlinkCancelledWaiters剔除取消节点​​​​(6)reportInterruptAfterWait对中断做出补偿​​

  • ​​3、ConditionObject#awaitUninterruptibly​​
  • ​​4、ConditionObject#await(long, java.util.concurrent.TimeUnit)​​
  • ​​5、ConditionObject#awaitNanos(long nanosTimeout)​​
  • ​​6、ConditionObject#awaitUntil(Date deadline)​​
  • ​​7、ConditionObject#signal​​
  • ​​8、ConditionObject#signalAll​​

  • ​​四、总结​​


一、前言

​Condition​​​的功能类似于​​wait/notify​​​,可用于实现生产者消费者模式,在阻塞队列中有着广泛实践。但是有了​​wait/notify​​​为什么还要有​​Condition​​呢?

​wait/notify​​​必须和​​synchronized​​​配套使用,正因为如此,​​wait/notify​​​作用的对象和​​synchronized​​的对象是同一个,无法区分队列空和队列满的情况,导致生产者可以唤醒消费者,也可以唤醒其他生产者,反之亦然。

​Condition​​​的出现就是为了解决​​wait/notify​​​不能针对性唤醒的问题。​​Condition​​​也是需要和​​Lock​​​配套使用,所以​​Condition​​​也是​​Lock​​​的一部分。​​Condition​​​是如何解决​​wait/notify​​的问题的呢?

二、代码示例

如下利用​​Condition​​实现了一个简易的生产者和消费者模式。将集合容量设置为1,生产者和消费者将交替运行。


  • ​notFull​​​表示生产者的状态,​​notFull.await()​​​负负为正表示集合满状态,生产者阻塞,停止生产;​​notFull.signal()​​表示集合不满,生产者唤醒,继续生产。
  • ​notEmpty​​​表示消费者状态,​​notEmpty.await()​​​负负为正表示集合空状态,消费者阻塞,停止消费;​​notEmpty.signal()​​表示集合不空,消费者唤醒,继续消费。

public class Test {
public static void main(String[] args) throws InterruptedException {
final int maxCapacity = 1;
List<Integer> store = new ArrayList<>(maxCapacity);
Lock lock = new ReentrantLock();

Condition notEmpty = lock.newCondition();
Condition notFull = lock.newCondition();

Thread producer = new Thread(new Runnable() {
@Override
public void run() {
for (;;) {
lock.lock();
try {
if (store.size() >= maxCapacity) {
try {
System.out.println("store 满了,生产者阻塞,不满等待notFull.wait()");
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int rand = new Random().nextInt();
store.add(rand);
System.out.println("store 没有满,生产者生产,并唤醒消费者notEmpty.signal()");
notEmpty.signal();
} finally {
lock.unlock();
}
}
}
});
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
for (;;) {
lock.lock();
try {
if (store.size() == 0) {
try {
System.out.println("store 空了,消费者阻塞,不空等待notEmpty.wait()");
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
store.remove(0);
System.out.println("store不空,消费者消费, 不满唤醒notFull.signal()");
notFull.signal();
} finally {
lock.unlock();
}
}
}
});
producer.start();
consumer.start();
}
}

//控制台输出
store 没有满,生产者生产,并唤醒消费者notEmpty.signal()
store 满了,生产者阻塞,不满等待notFull.wait()
store不空,消费者消费, 不满唤醒notFull.signal()
store 空了,消费者阻塞,不空等待notEmpty.wait()
store 没有满,生产者生产,并唤醒消费者notEmpty.signal()
store 满了,生产者阻塞,不满等待notFull.wait()
store不空,消费者消费, 不满唤醒notFull.signal()
store 空了,消费者阻塞,不空等待notEmpty.wait()
store 没有满,生产者生产,并唤醒消费者notEmpty.signal()
store 满了,生产者阻塞,不满等待notFull.wait()
... ...

三、Condition实现原理

​Condition​​​是一个接口,它的实现类是​​AbstractQueuedSynchronizer​​​的一个内部类​​ConditionObject​​​。从源码可以看出​​ReentrantLock​​​和​​ReentrantReadWriteLock.WriteLock​​​有Condition的功能且都是调用了AQS的​​ConditionObject​​​,所以其实现逻辑是一样的;​​ReentrantReadWriteLock.ReadLock​​不支持Condition的功能。

//ReentrantLock#newCondition
public Condition newCondition() {
return sync.newCondition();
}
//ReentrantLock.Sync#newCondition
final ConditionObject newCondition() {
return new ConditionObject();
}
//ReentrantReadWriteLock.WriteLock#newCondition
public Condition newCondition() {
return sync.newCondition();
}
//ReentrantReadWriteLock.Sync#newCondition
final ConditionObject newCondition() {
return new ConditionObject();
}
//ReentrantReadWriteLock.ReadLock#newCondition
public Condition newCondition() {
throw new UnsupportedOperationException();
}

1、Condition单向队列

​ConditionObject​​​中维护了一个单向链表队列。​​firstWaiter​​​相当于head,​​lastWaiter​​相当于tail。

入队列流程如下:


  1. 新建一个节点node,​​node.thread=Thread.currentThread() ;node.waitStatus=CONDITION​​。
  2. 若等待队列为空时,​​firstWaiter​​​和​​lastWaiter​​都指向新节点node。
  3. 等待队列不为空,node拼接到队列尾部,​​lastWaiter​​指针后移到node。

public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
//相当于head
private transient Node firstWaiter;
/** Last node of condition queue. */
//相当于tail
private transient Node lastWaiter;
public ConditionObject() { }

/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();//清除取消节点
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
//空的 firstWaiter和 lastWaiter都指向 node
// firstWaiter=lastWaiter=null 是空
// firstWaiter=lastWaiter!=null 说明只有一个节点
firstWaiter = node;
else
//tail != null,将节点拼接到尾部
t.nextWaiter = node;
//lastWaiter指针后移到新节点
lastWaiter = node;
return node;
}
}

AQS源码解读(四)——Condition原理详解(Object#wait/notify优化?singnal唤醒线程了吗?)_ConditionObject

了解了​​ConditionObject​​​的单向队列原理,​​await()​​​和​​signal()​​的原理也就容易理解了。

2、ConditionObject#await

​await()​​​是在一个线程持有锁的情况下调用,所以整体是线程安全的,同时​​await()​​​会响应线程中断。如下是​​await()​​基本流程:


  1. 将当前线程赋值给新节点并拼接到队列尾部。
  2. 完全释放锁​​fullyRelease​​,使得当前线程不再持有锁,state=0。
  3. 当node节点不在AQS同步队列中,则阻塞当前线程。
  4. 线程被唤醒有可能是其他线程调用了unpark,也有可能是发生中断导致唤醒,中断唤醒跳出循环,自然唤醒则循环一次判断节点是否在AQS同步队列中,在则终止循环。
  5. 若唤醒循环结束,重新获取锁。
  6. 无论获取锁成功与否,若唤醒的过程发生过中断,最后都需要对中断做出补偿​​reportInterruptAfterWait​​。

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//1.放入condition队列-尾
Node node = addConditionWaiter();
//2.释放所有锁 state--->0,完全释放锁失败抛出异常不会继续往下走
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//3.不在aqs的同步队列中,则继续阻塞
LockSupport.park(this);
//4.被唤醒,判断unpark过程中是否发生过中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//5.acquireQueued 返回false为拿锁成功,返回true是阻塞被唤醒或者被中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
//不是唤醒前被中断 那就是唤醒后被中断
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

(1)fullyRelease完全释放锁

调用await和调用​​java.lang.Object#wait()​​​一样都要先完全释放锁,如果完全释放锁失败则将node节点状态设置为取消​​Node.CANCELLED​​​,也就有了剔除​​codition​​​队列取消节点的操作​​unlinkCancelledWaiters​​。

final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取重入次数
int savedState = getState();
//完全释放锁
if (release(savedState)) {
failed = false;
//返回重入次数
return savedState;
} else {
//释放锁失败,直接抛出异常
throw new IllegalMonitorStateException();
}
} finally {
//如果完全释放锁失败,则将node节点状态设置为取消。
//所以就有了剔除codition队列取消节点的操作
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

(2)isOnSyncQueue判断节点是否在AQS同步队列


  • ​node.waitStatus=CONDITION​​​,node在​​ConditionObject​​等待队列。
  • ​node.prev=null​​,node没有前继节点,不在AQS的同步队列。
  • ​node.next != null​​,node有后继节点,node必须在AQS同步队列。
  • 以上都不满足的情况下,从AQS的尾部开始遍历查询是否有node节点。

final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on aqs queue
return true;
//从aqs 尾开始遍历查找 node
return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

(3)checkInterruptWhileWaiting判断唤醒过程中是否发生中断

线程从​​await​​​中唤醒,需要判断唤醒的过程中是否发生过中断,若发生过中断,需要进一步判断是唤醒后中断(​​REINTERRUPT = 1​​​),还是唤醒前中断(​​THROW_IE = -1​​)?

/** Mode meaning to reinterrupt on exit from wait */
//唤醒后被中断
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
//唤醒前中断
private static final int THROW_IE = -1;

private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

(1)node可以从CONDITION修改为0,说明node还在condition队列中,需要将node放入AQS同步队列,并返回true,代表唤醒前(调用​​singal​​前)发生的中断。

(2)如果node不能可以从condition修改为0,则判断node是否在AQS中,不在则自旋让出cpu资源(等其他线程调用singal将node移动到AQS同步队列),在则返回false,代表唤醒后(调用​​singal​​后)被中断。

final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//可以从condition修改为0,说明node还在condition队列中,需要将node放入AQS队列,
//并返回true,代表唤醒前发生的中断。
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
//如果node不能可以从condition修改为0,则判断node是否在AQS中,不在则自旋让出cpu
//在则返回false,代表唤醒后被中断
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

(4)acquireQueued重新获取锁

需要注意acquireQueued返回值代表的是,是否发生中断,acquireQueued是自旋获取锁,虽然会有阻塞,但是唤醒后依然自旋获取锁,直至获取锁成功才返回,所以其既然能执行完毕返回就代表获取锁成功了。

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//拿node的前一个节点
final Node p = node.predecessor();
//若p是头节点,,说明自己排在队列的第一个尝试抢锁
if (p == head && tryAcquire(arg)) {
//node成为新的head
setHead(node);
p.next = null; // help GC
failed = false;
//拿到锁了返回false
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//基本不可能走到这一步,除非是系统级别的异常导致获取锁失败for循环意外退出,
cancelAcquire(node);
}
}

(5)unlinkCancelledWaiters剔除取消节点

​if (node.nextWaiter != null)​​如果node还有后继,说明condition队列中还有其他节点则从头遍历执行剔除取消节点的操作。

private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
//trail指向的是上一个正常的节点
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
//从头遍历
t = next;
}
}

(6)reportInterruptAfterWait对中断做出补偿

对这两种中断做出相应的补偿措施,唤醒前中断(​​THROW_IE = -1​​​),抛出异常;唤醒后中断(​​REINTERRUPT = 1​​),作为补偿再自己中断一下自己。

private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

3、ConditionObject#awaitUninterruptibly

​awaitUninterruptibly()​​​和​​await()​​逻辑类似,区别在于不会响应线程中断,不会对中断做出补偿,也不会做剔除取消节点的操作。

//不会响应中断
public final void awaitUninterruptibly() {
//加入condition双向队列
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
//线程中断不会跳出循环
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
//补偿机制
selfInterrupt();
}

4、ConditionObject#await(long, java.util.concurrent.TimeUnit)

​await(long, java.util.concurrent.TimeUnit)​​​可以传递两个参数时长和时间类型,同时也可响应中断。代码结构上和​​await()​​相似,只是加入了时间因素,而时间因素就用在了自旋的过程中:


  1. 通过传入的时长​​time​​,计算出deadline。
  2. 自旋如果超时​​nanosTimeout <= 0L​​​,则主动将node放入AQS同步队列,此时node并没有移出condition队列,只是将状态从​​Node.CONDITION​​​改为0,会在后面​​unlinkCancelledWaiters​​时移出condition队列。
  3. 自旋还未超时,则会给一个自旋的阈值(​​spinForTimeoutThreshold = 1000L​​​),自旋1000纳秒后未跳出自旋则阻塞线程一段时长(​​parkNanos(this, nanosTimeout)​​)。
  4. 线程阻塞时长到,自动唤醒,继续自旋,判断node是否在AQS同步队列,判断是否超时。

public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
//时间到,自动唤醒,放入aqs队列
timedout = transferAfterCancelledWait(node);
break;
}
//自旋一定时间1000纳秒后,阻塞
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

注:​​await(long, java.util.concurrent.TimeUnit)​​返回结果是是否超时,true为未超时,false为超时。

5、ConditionObject#awaitNanos(long nanosTimeout)

​awaitNanos(long nanosTimeout)​​​与​​await(long, java.util.concurrent.TimeUnit)​​​类似,也和时间有关。​​awaitNanos(long nanosTimeout)​​​只能传入一个纳秒的时长,返回是超时剩余时长(​​deadline - System.nanoTime()​​)。

public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
//时间到,自动唤醒,放入aqs队列
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//响应中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}

6、ConditionObject#awaitUntil(Date deadline)

​awaitUntil​​也是和时间有关,只是这个时间不是一个时间段,而是一个日期,或者说是一个时间戳。自旋过程没有给定一个阈值,若未到截止日期,则线程会阻塞,在没有其他线程唤醒或者中断的情况下,线程会到截止日期后自动唤醒。

public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
//时间到,自动唤醒,放入aqs队列
timedout = transferAfterCancelledWait(node);
break;
}
//abstime绝对时长,是个时间戳
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

7、ConditionObject#signal

唤醒是从队头开始唤醒,同样是天生的线程安全。

public final void signal() {
if (!isHeldExclusively())
//持有锁的不是当前线程
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

唤醒的逻辑在​​doSignal()​​中:


  1. first移出condition等待队列(​​first.nextWaiter = null​​)。
  2. ​first.waitStatus​​设置为0。
  3. first入AQS同步队列。
  4. 设置first的前继节点的​​waitStatus​​​为​​SIGNAL​​,等待其他线程释放锁的时候真正唤醒。

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;//等待队列空的 无需唤醒

first.nextWaiter = null;
//first != null,这里的唤醒只是将first取出等待队列,放入aqs队列的尾部,
// 并不会unpark
//除非aqs前继节点被取消或者signal状态设置失败,会直接唤醒first节点的线程
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//1.修改状态,改不了就是已经被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

//2.入aqs同步队列
Node p = enq(node);
int ws = p.waitStatus;
//前继节点 ws > 0代表取消
//设置前继节点的ws为SIGNAL,预示 其后继节点要被唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//前继节点取消 or ws设置 SIGNAL 失败,将立即唤醒node节点的线程
LockSupport.unpark(node.thread);
return true;
}

何为唤醒?正常情况下, ​​signal()​​​不会真正唤醒线程,而是将condition队列的头结点移到AQS的同步队列中,等待某个线程释放锁的时候真正唤醒。除非放入AQS同步队列后,其前继节点被取消或者状态设置为​​SIGNAL​​失败,才会直接唤醒first节点的线程。

8、ConditionObject#signalAll

​signalAll()​​​唤醒等待队列中的所有节点,唤醒过程和​​signal()​​类似。

public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//唤醒所有
doSignalAll(first);
}
private void doSignalAll(Node first) {
//清空等待队列
lastWaiter = firstWaiter = null;
//循环唤醒
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

四、总结


  • ​Condition​​必须和​​Lock​​配套使用,线程只有获取锁后才能调用​​Condition​​的​​await​​或者​​singal​​。
  • ​await​​过程中会完全释放锁。
  • 正常情况下​​signal​​不会唤醒​​condition​​队列中的节点,只是将头节点移到了AQS同步队列中,并设置其前驱状态为​​SIGNAL​​,等待其他线程释放锁后正在唤醒。

PS: ​如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!

AQS源码解读(四)——Condition原理详解(Object#wait/notify优化?singnal唤醒线程了吗?)_JDK8_02



举报

相关推荐

0 条评论