AQS 详解及源码分析
1.概述
AQS 是什么?全称为 AbstractQueuedSynchronizer,是 JDK 中的一个抽象类。首先我们看看继承它的有哪些类:
基本上所有 JUC 并发包中的类都和它有关系,AQS 是用来构建锁或者其他同步器组件(读写锁等)的重量级基础框架及整个JUC体系的基石,通过内置的 FIFO 队列来完成对资源获取线程的排队工作,并通过一个 int 类型变量表示持有锁的状态。
通过 AQS 可以维持对共享资源的并发操作。
2.构成
通过这个 UML 图,可以得知 AQS 的来源和内部的组成,AQS 抽象类中还维护了两个内部类:Node 和ConditionObject 这两个内部类对于共享资源的维护其到关键的作用。Node(1) 用于数据封装、COnditionObject 用户状态维护,可以初步的认为 AQS 是一个由同步器和双向链表组成的 FIFO 队列。
如下图:
state 维护了对共享资源的状态,通过状态的定义,可以描述共享资源的状态,是被线程持有还是可以获取等状态。并且对 state 的更改具有原子性。
3.LockSupport
3.1 概述
研究 AQS 的源码前,需要先了解 LockSupport,用于创建锁和其他同步类的基本线程阻塞原语,线程等待唤醒机制的升级版。
LockSupport 类使用了一种名为 permit(许可)的概念来做到阻塞和唤醒线程的功能,每一个线程都有一个许可,许可(permit)只有两个值1和0,每一个线程默认为0。
可以把许可看成是一种(0, 1)信号量(semaphore),但与 Semaphore(JUC 的一个辅助同步类)不同的是,许可的累加的上线为1。
3.2 等待和唤醒方式
- 使用 Object 类中的 wait() 方法让线程阻塞,notify()、allnotify() 唤醒阻塞线程
- 使用 JUC 包中的 ReentrantLock 类中的 Condition 的 await() 方法让线程阻塞,single() 方法通知唤醒具体线程
- 使用 LockSupport 类可以阻塞当前线程和唤醒指定被阻塞线程 park() 和 unpark()
3.3 park() 和 unpark()
3.3.1 park()
当线程默认调用 park() 时,因为当前 permit 的默认值为0,那么当前线程就会阻塞,直到其他线程(unpark)将当前线程的 permit 设置为1时,阻塞线程才会被唤醒。然后将消耗掉一个 permit。
3.3.2 unpark()
调用 unpark() 方法后,指定线程的 permit 许可就会加1,但是多次调用是不会使 permit 增加(不会累加),会自动唤醒因park被阻塞的线程,并返回。
3.3.3 实例
public class LockSupportDemo {
public static void main(String[] args) {
Thread a = new Thread(() -> {
// 使线程阻塞
LockSupport.park();
// 等待被唤醒
System.out.println(Thread.currentThread().getName()+"\t 被唤醒了");
},"线程一");
// 这里我们让唤醒的线程后启动,测试是否需要先阻塞后唤醒
a.start();
Thread b = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
// unpark方法需要传递一个指定的线程
LockSupport.unpark(a);
System.out.println(Thread.currentThread().getName()+"\t 解锁线程一");
} catch (InterruptedException e) {
System.out.println(e);
}
},"线程二");
b.start();
}
}
/*
* 运行结果:
* 线程二 解锁线程一
* 线程一 线程一被唤醒了
* */
3.3.4 对比另外两种阻塞和唤醒方式
-
wait 和notify 不能脱离 synchronized 使用
-
notify 必须要使在wait后面执行,才可以成功唤醒
-
ReentrantLock 中的 await 和 single 也有上面同样的问题
-
使用 LockSupport 中的 park 和 unpark 不需要配合锁使用,实现单纯的将线程阻塞和唤醒,并且阻塞和唤醒没有必要的顺序
3.4 源码分析
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
// 查看 UNSAFE.unpark 和 UNSAFE.park
public native void unpark(Object var1);
public native void park(boolean var1, long var2);
底层就是通过 unsafe 类中的 native 代码,原语级别的控制。(1)
每次调用 park 就会消耗一次 permit,如果 permit 为0则没有可以消耗的,那么就会造成线程此刻阻塞。(默认为0)
unpark 就是发放给指定线程 permit,但是发放的 permit 不能累加,就像前面说的 permit 只有1、0两种状态。
可以让线程多次在不同的时间段进行阻塞(不用锁的加持),然后再指定时间再次发放 permit,唤醒线程,达到多次阻塞多次唤醒的目的。
4.源码分析
就如同前面的所说在 JUC 中的众多锁和同步器都继承自 AQS,因此 AQS 是 JUC 的重要的框架基石。
4.1 概述
AQS 是同步器加上数据结构中的队列来组成,队列在底层是一个虚拟双向队列。
原理概述:抢到资源的线程直接使用资源并进行处理业务逻辑,没有抢到的资源的线程必然涉及一种排队等候机制(队列实现)。抢占失败的线程继续排队等待,但等候的线程仍然保留获取锁的可能且获取锁的流程仍然在继续。
同步器会管理队列中等待的线程。
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁的分配。这个机制主要用的是 CHL(开头的有图,双向链表队列)队列的变体来实现,将暂时获取不到锁的线程加入到队列中,通过 AQS(有一个头结点和尾结点)来维护,这个队列就是 AQS 的抽象表现。
它将请求共享资源的线程封装成队列的结点(结点中有前后结点指向和 waitstate 标识等),通过 CAS、自旋以及LockSupport.park() 的方式,维护 state(volatile修饰的int类型)变量的状态(当变量状态大于0时就说明,当前有线程获取到了共享资源),使并发达到同步的控制结果。
4.2 过程详解
通过整个 ReentrantLock 中多个线程争抢共享资源的过程来分析 AQS 的如何实现对多线程并发下的管理。
####4.2.1 尝试加锁
第一个进入的线程会直接获取锁并设置共享资源对象的锁持有对象为当前线程,还会修改当前 state 值为1,表示资源已被占用。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
4.2.2 加锁失败
后面进入的线程发现共享资源对象已经有线程占用则会先进行尝试再次获取。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1.tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
========
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 1.如果当前资源状态为 0,说明前面使用资源的线程已经完毕
if (c == 0) {
// 1.1 尝试加锁并将当前共享资源的获取线程设为自己
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 2.下面的判断是判断当前在操作共享资源的线程是不是这个在执行当前方法的线程。
// 2.如果是则会将标志位state上的值加 1(1)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果尝试获取失败,返回 false 那么就会执行后面的 acquireQueued() 方法,在进入前,系统会为等待队列新建一个节点(哨兵结点),不存储任何信息,帮助占位,后续才会是需要占用共享资源的线程进入,如图:
2.acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter(Node.EXCLUSIVE) 方法,前面尝试再次获取锁失败以后,就会执行这个方法将当前线程加入到等待队列中,源码:
private Node addWaiter(Node mode) {
// 1.创建节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 2.获取队列尾节点
Node pred = tail;
// 3.判断当前队列是否为空
// 3.1 不为空
if (pred != null) {
// 3.1.1 将当前节点加入到队列尾部,通过 CAS 的方式
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 3.2 队列为空,执行 enq(node) 方法,初始化一个队列
enq(node);
return node;
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg),进入队列以后,线程也不会安分(通过自旋再尝试获取锁),大概再尝试获取2次,如果还是获取不到资源,那么就会被调用 park 方法将当前线程进行阻塞,等待被唤醒。
并将哨兵节点的 waitstate 设置为 -1,源码:
final boolean acquireQueued(final Node node, int arg) {
// 默认再次尝试是失败的
boolean failed = true;
try {
// 默认不可中断
boolean interrupted = false;
// for(;;) 等同于 while(true),一种自旋
for (;;) {
final Node p = node.predecessor();
// 1.判断当前节点时候为队列的头节点,如果为头节点则进行尝试获取锁
if (p == head && tryAcquire(arg)) {
// 1.1 获取成功锁
setHead(node);
// 1.2 出队
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2.如果获取失败,shouldParkAfterFailedAcquire(p, node) 则进行 park 阻塞处理
// 2.parkAndCheckInterrupt() 判断当前线程是否可被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(p, node) 方法和 parkAndCheckInterrupt() 方法,通过哨兵节点的状态值进行对当前头节点的处理,获取不到锁就会被 park,被阻塞,源码:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
=========
private final boolean parkAndCheckInterrupt() {
// 阻塞该线程,等待被唤醒
LockSupport.park(this);
return Thread.interrupted();
}
至此,获取不到资源的线程才真的被阻塞进行排队等待。
4.2.3 解锁
当线程资源使用资源完毕后,则会进行解锁,Reentrant unlock() 方法源码:
public void unlock() {
sync.release(1);
}
release(1) 方法,在 AQS 抽象类中,进行真正的解锁操作,源码:
public final boolean release(int arg) {
// 1.尝试资源解锁
if (tryRelease(arg)) {
// 1.1 资源解锁后,获取头结点
Node h = head;
// 1.2 判断是否有等待的线程
if (h != null && h.waitStatus != 0)
// 1.3 解锁头结点
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(arg) 方法,对资源进行真正的解锁,源码:
protected final boolean tryRelease(int releases) {
// 1.获取资源状态 -1 的状态
int c = getState() - releases;
// 2.判断当前线程是否为持有资源的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 3.将资源状态设为 0
if (c == 0) {
// 3.1 设置资源解锁成功
free = true;
// 3.2 设置资源的持有线程为 null
setExclusiveOwnerThread(null);
}
// 4.设置资源状态
setState(c);
return free;
}
4.3 总结
-
没有线程占用共享资源,直接上锁
-
有则将后续需要抢占共享资源的线程会加入队列进行排队
-
-
用完以后解锁
- 队列中的队列会和当前执行完资源的线程一起抢占
- 队列中的线程如果抢占到资源,那么队列中原哨兵结点会出队,原第一个等待的结点成为新的哨兵结点
通过 AQS 知晓了 Java 中对强制共享资源的线程进行管理的方式,保证了并发问题,synchronized 和 ReentrantLock 默认都是非公平锁,后续线程的资源争夺是随机的,但是 ReentrantLock 在实例化时可以设置参数为 true 来设置为公平锁,那么就会按照队列进入顺序进行依次获得锁,会降低并发性能。
公平锁与非公平锁的区别就在于,ReentrantLock 执行加锁方法时,选择的内部类问题。
公平锁会选择 static final class FairSync extends Sync
内部类,而非公平锁会选择 static final class NonfairSync extends Sync
,从而到达等待队列在获取资源的方式。