目录
前言,关于AQS的源码介绍有很多参考,但是回归本源,为什么要设计AQS,它为实现解决并发的各类锁提供了基础,那它又是怎么处理和保证锁并发安全的呢,这里简要并深究的分析下其实现方式。内容供参考,欢迎领导和同志们指导和提出意见。(关于并发编程,个人推荐一个很优秀且教学清晰的up主:寒食君)
什么是AQS?
还是简要总结一下,AbstractQueuedSynchronizer(AQS)!类如其名,抽象的队列式的同步器。它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向链表。基于CAS实现锁操作和并发安全。
AQS哪些过程会存在线程安全风险?
这里考虑一般的对竞争资源获取的情况,主要有哪些步骤?
1.争抢锁,必须保证有且只有一个线程可以获取锁。
2.未获取锁的线程要准确无误的进入队列等待及决定是否安全挂起。
3.获取锁的线程要可以安全的释放锁,并唤醒一个等待队列的线程。
AQS如何保证线程安全?
下面结合源码(ReentrantLock中的非公平锁NonfairSync )对上面这些过程进行分析:
争抢锁,必须保证有且只有一个线程可以获取锁:关键词 state 变量
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
/** 尝试获取锁,不成功则加入等待队列
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 争抢锁资源,非公平锁实现方法如下:nonfairTryAcquire()
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
...
/**争抢锁资源,通过cas操作获取锁
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
...
}
主要过程为 205 行,通过使用compareAndSetState原子操作,保证有且只有一个线程可以加锁成功。
未获取锁的线程要准确无误的进入队列等待及决定是否安全挂起:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
/** 尝试获取锁,不成功则加入等待队列
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 加入等待队列,等待获取锁
selfInterrupt();
}
...
/**让线程有序安全的进入等待队列,保证线程安全进入队列
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
611 if (compareAndSetTail(pred, node)) { // 原子操作入队
pred.next = node;
return node;
}
}
enq(node);
return node;
}
...
/** 加入队列会存在失败的情况,故自旋直至加入成功为止
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
...
/**在队列中等待获取锁资源,准备挂起,并在唤醒后继续尝试获取锁,自旋直至成功
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
...
/**判断当前线程是否应该挂起 (这里也比较重要,这里保证了挂起的线程会被获取锁的线程唤醒)
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* 这里也是一个关键步骤,阻塞线程会设置前置节点的waitStatus标识为SIGNAL,并在设置成
* 功后,才进行挂起。前置节点的线程释放锁后,会根据此值判断是否需要唤醒下一个线程。
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
主要过程为 addWaiter()方法 611 行以及enq()方法,通过使用compareAndSetTail,compareAndSetHead原子操作,保证有序安全入队。
获取锁的线程可以安全的释放锁,并唤醒一个等待队列的线程:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
/**释放锁,并在完全释放锁(锁可重入)后,唤醒等待队列的线程
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
1263 if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
...
/**唤醒可能挂起的等待队列的下一个节点线程
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
...
}
注意到release()方法中 1263 行,会对节点的waitStatus标识进行判断,若!=0,则尝试唤醒下一节点线程。如入队过程中 shouldParkAfterFailedAcquire()方法中所示,下一节点挂起时,会在修改前置节点的waitStatus状态成功后才会进行挂起。故若下一节点挂起,则可以保证进行唤醒操作。
那么这样就是安全可靠的吗?这里并没有使用类似compareAndSetPark操作,考虑上面的情况:若是shouldParkAfterFailedAcquire之后,该线程还未挂起,但是前一个节点已经调用唤醒该线程的unpark操作,随后该线程执行park操作,那么是不是该线程便无人可以唤醒了?
这里需要了解下park与unpark,可参考附录链接;park与unpark操作是基于_counter(计数器)、 _mutex(互斥量)、_cond(条件变量)3个角色实现。简单来说,unpark操作将_counter设置为1,会唤醒线程。park操作将_counter设置为0,且判断是否_counter原值为1,若是则不会阻塞线程。大概类似于先吃后付款,先付款后吃都要给饭吃,不能白白unpark。
AQS主要是通过CAS操作保证线程安全,并有一套完善的逻辑保证资源的获取和释放。
附言:AQS的知识点和设计巧妙之处有很多,这里只是大致的分析了AQS如何实现锁及保证线程安全,还有许多知识需要仔细分析,比如 AQS 唤醒时为何从尾部开始查找?(线程安全问题)新建第一个阻塞队列节点时为什么加入一个空的线程头节点?(哨兵节点)ReentrantLock非公平锁与公平锁的区别?(实际上非公平指的是唤醒的线程与新线程抢锁的不公平,队列内的线程算是公平的)
休息一波开个盲盒(谨慎点击):
http://p6.itc.cn/images01/20201108/b9b0d238a3d145a18e81a51298a0c248.gif
参考链接地址:
AQS这样学就很简单了 - 知乎
Java并发编程——LockSupport的park和unpark - 简书
LockSupport的park/unpark分析_u013978512的博客-CSDN博客
Java AQS unparkSuccessor 方法中for循环为什么是从tail开始而不是head_womeiyouzaihuashui的博客-CSDN博客_unparksuccessor为什么从尾部
https://blog.51cto.com/u_15127639/2873074