一、AQS
AQS
:AbstractQueuedSynchronizer
,抽象队列同步器,是juc
的基础,如常用的ReentrantLock
、Semaphore
、CountDownLatch
、ReentrantReadWriteLock
等都是基于AQS
来进行设计的。
AQS
通过维护一个同步状态state
和一个双向队列CLH
来实现加锁释放锁。
CLH
队列中存放的Node
结点主要关注如下几个属性:
waitStatus
:等待状态,取值分别为CANCELLED = 1
,SIGNAL = -1
,CONDITION = -2
,PROPAGATE = -3
,以及初始时的0prev
:当前结点的前驱结点next
:当前结点的后继结点thread
:当前结点对应的线程
二、加锁
1、调用lock.lock()
进行加锁,lock()
方法内部调用sync.lock()
方法进行加锁,非公平锁中,sync
是NonfairSync
的实例,其具体加锁代码如下
final void lock() {
// 自旋,设置AQS中同步状态 state 的值,如果 state 为0,表示当前没有线程持有锁,则将其设置为1
if (compareAndSetState(0, 1))
// 设置当前线程持有排他锁,加锁结束
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果自旋失败,则表示已经有线程持有锁,state 的值不为0(大于0)
// 则通过排他模式获取锁,如果获取不成功,则进入队列
acquire(1);
}
2、acquire
方法,代码如下
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2.1、该方法的重点在于if判断,首先是 tryAcquire
,在子类NonfairSync
中实际调用的是nonfairTryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取同步状态
int c = getState();
// 若同步状态为0,表示当前锁处于空闲状态
if (c == 0) {
// 尝试抢占,这里有并发的可能,所以有可能会抢不到
if (compareAndSetState(0, acquires)) {
// 如果抢到了,那就设置持有者为当前线程,并返回true
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程已经持有了锁,则重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 当前线程未持有锁
return false;
}
通过上面代码上的注释分析,可以知道,tryAcquire
如果返回true,则表示当前线程持有了锁,如果未持有锁,则进入acquireQueued
2.2、acquireQueued
中首先会执行内层方法addWaiter(Node.EXCLUSIVE)
,将当前线程加入到等待队列队尾,Node.EXCLUSIVE
表示排他模式,对应的还有共享模式
private Node addWaiter(Node mode) {
// 构建当前线程对应的队列结点
Node node = new Node(Thread.currentThread(), mode);
// 获取队尾结点,如果是首次往等待队列中添加结点,则 tail 指向null
Node pred = tail;
if (pred != null) {
// 如果队列不为空,则尾插法,将当前线程结点放到队尾,如果入队成功了,就返回这个结点
// 这一步在存在并发的情况下可能会设置失败,那就会继续往下执行 enq 方法,通过自旋来将当前结点放置到队尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
// 自旋操作
for (;;) {
// 队尾结点
Node t = tail;
// 如果队尾结点为空,则表示队列为空,需要进行初始化
if (t == null) {
// 注意这一步,初始化设置头结点的时候,并不是将入队的 node 结点放入队首,而是new了一个新的结点
// 这时候,这个头结点其实是一个傀儡结点,或是哨兵结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾插法,将当前线程结点放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
到了这里,可以知道,在执行了addWaiter
之后,队列中有两个结点,头结点自然是傀儡结点,其属性中,waitStatus
为0,thread
为null,pre
指向null,next
指向当前队列中的第一个真正的等待结点。
第二个结点才是队列中第一个真正的结点,也是队尾结点,此时其waitStatus
为0,pre
指向头结点(傀儡结点),next
指向null,thread
为对应的线程。
2.3、内层方法addWaiter
执行完后,执行外层方法acquireQueued
,代码如下
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 返回node的前驱结点,如果node是第一个等待结点,则返回的是头结点(傀儡结点)
final Node p = node.predecessor();
/*
如果node的前驱是头结点,则尝试获取锁,tryAcquire方法在上面已经分析过了
tryAcquire会尝试获取锁,但不一定会获取到,因为是非公平锁,
所以在尝试获取锁的时候,可能会被其他线程抢占
*/
if (p == head && tryAcquire(arg)) {
// 如果当前线程抢到了锁,则将当前线程对应的node设置为新的傀儡结点
/*
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
*/
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
如果当前结点的前驱不是头结点或没有抢到锁则进入if判断
shouldParkAfterFailedAcquire方法一看名字就知道作用了,用于判断是否应该在获取锁失败后将线程挂起
shouldParkAfterFailedAcquire如果返回true,说明当前线程需要被挂起
通过parkAndCheckInterrupt挂起当前线程,直到线程被唤醒后返回其中断状态
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 状态大于0,就只有一个取值,也就是CANCELLED,前驱结点取消等待,那就将其从队列中移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前驱结点的状态设置为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
从shouldParkAfterFailedAcquire
的源码来看,队列中除了最后一个结点的waitStatus
是0外(初始状态为0),其余结点的waitStatus
都不是0。
private final boolean parkAndCheckInterrupt() {
// 挂起当前线程,挂起后,return不会执行,直到等到unpark后才会继续执行
LockSupport.park(this);
// 唤醒后,才会继续执行return语句
return Thread.interrupted();
}
看到LockSupport
就不得不提一下java中线程等待唤醒的几种方法
- 使用
Object
的wait()
方法让线程等待,使用notify()
或notifyAll()
进行唤醒 - 使用
JUC
包中Condition
的await()
方法让线程等待,使用signal()
方法唤醒线程
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
- 使用
LockSupport
的park()
让线程等待,使用unpark()
进行唤醒
前两种方法都是必须在锁里才能执行,synchronized
里面执行wait
和notify
,在lock
里面执行await
和signal
,如果没有锁,会报错,而且对加锁和解锁的顺序有要求,如果先解锁后加锁会导致卡死。
LockSupport
俗称锁中断,它不需要持有锁也可以进行加锁解锁,而且加锁解锁先后顺序无要求,可以先解锁后加锁也不会导致卡死。这里只是简单提一下,具体的还是得实操。
回到正题,通过acquireQueued
代码段中的注释分析可以知道,当node
结点对应线程进入此方法后,如果其前驱是头结点(傀儡结点)则它会尝试获取锁,获取到锁之后它就变成了新的头结点。
而如果其前驱不是头结点,或者前驱是头结点但没有抢占到锁,则它会被挂起,直到持有锁的线程释放锁后,并且其满足被唤醒条件后才会被唤醒(见后面的unlock
分析)又或者被中断,直到其成功抢到锁。
三、解锁
public void unlock() {
// 如果当前线程持有锁,则将持有数量-1,为0后释放锁
sync.release(1);
}
release
代码如下
public final boolean release(int arg) {
// 尝试释放锁,因为是可重入锁,所以锁的同步状态state可能会大于1
// 所以tryRelease执行后当前线程可能依旧持有锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒头结点的后继结点
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
执行的是ReentrantLock.Sync#tryRelease
,代码如下
protected final boolean tryRelease(int releases) {
// getState获取锁的状态,或者说加锁次数
int c = getState() - releases;
// 如果持有锁的不是当前线程,则抛出异常,即不允许释放其他线程的锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 锁的释放状态
boolean free = false;
if (c == 0) {
// 如果加锁次数为0,就表示锁已经完全被释放了
free = true;
// 没有线程持有锁
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
在release
中,如果锁被完全释放了,则会调用unparkSuccessor
来唤醒头结点的后继结点对应的线程,唤醒后,就回到了上面分析过的parkAndCheckInterrupt
中的返回语句继续争抢锁。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒头结点的后继结点对应线程
LockSupport.unpark(s.thread);
}
整个流程大致就是这样子,不得不感叹,设计AQS的几位大佬真的是了不起。