0
点赞
收藏
分享

微信扫一扫

【AQS】ReentrantLock可重入锁源码解析


AbstractQueuedSynchronizer

AQS全程为 AbstractQueuedSynchronizer、抽象队列同步器。是j.u.c包下的一个抽象类,同时也是juc下并发同步工具的基础。像ReentrantLock,Semaphore,CountDownLatch都是基于AQS实现的。

CAS

CAS全程为 Compare And Set 或 Compare And Swap。即比较并交换。是AQS实现的基础。CAS在实现的时候,有三个值。一个是内存值 V,一个是期望值E,一个是更新值 U,当且仅当 V = E的时候,才会叫V更新为U。CAS 底层依赖与汇编指令 ​​cmpxchg​​. 这是一个CPU级别的指令,下面我们看看Linux和Windows下的c代码

我们可以通过OpenJdk源码看到如何定义的【jdk版本jdk-b64】

​​JDK7的源码 Linux CAS​​

linux_x86版本C++实现

\hotspot\src\os_cpu\linux_x86\vm\atomic_linux_x86.inline.hpp
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}

Windows版本C++实现

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint*  dest, jint compare_value) {
// alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}

首先会有一个方法 ​​os::is_MP​​​, 判断是否为多核的【multiple processor】。如果是多核,则需要使用lock指令。即 使用 ​​lock cmpxchg​​​指令来执行CAS操作,而如果是单核的则直接使用 ​​cmpxchg​​​指令即可。​​lock​​ 指令在执行的后面的指令的时候总线,后面优化成锁缓存。

用处:在Java中Unsafe类中定义了很多CAS操作,都是Native方法,Native方法即本地方法,调用的是JDK中的C++代码。具体实现在上面。而AQS中大量使用到了CAS操作。

ReentrantLock

这里通过ReentrantLock来学习AQS底层源码!

@Test
public void test(){
ReentrantLock lock = new ReentrantLock();
lock.lock();
try{
//....
}finally{
lock.unlock();
}
}

上面创建了ReentrantLock可重入锁。我们先看公平锁:

FairSync 中的 lock()

final void lock() {
this.acquire(1);
}

调用acquire方法,这个方法是定义在AQS中的。

acquire()

public final void acquire(int var1) {
// tryAcquire()尝试获取锁,如果获取锁不成功,则会执行后面的acquireQueued方法。
if (!this.tryAcquire(var1)
&& this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), var1)) {
selfInterrupt();
}
}

下面我们先看看​​tryAcquire​​ 方法,会调用实现来自己的方法,即公平锁中定义的tryAcquire方法

tryAcquire()

protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取AQS的状态
int c = getState();
// 0表示未被占有。即处于空闲状态,可以进行获取锁。
if (c == 0) {
// hasQueuedPredecessors 判断当前队列中是否有线程正在排队。
// !hasQueuedPredecessors() 与非公平的区别,需要判断等待队列中是否存在其他等待节点,如果存在,则返回true。
// 当hasQueuedPredecessors()返回true是,表示队列中有等待的节点。此时整个tryAcquire方法返回false,表示
// 获取锁失败,如果返回false,表示等待的节点,则表示可以使用CAS获取锁,如果获取成功,则设置独占锁的线程为
// 当前线程,如果CAS失败,则返回false. 进行入队。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {// compareAndSetState(0, acquires)抢锁。使用CAS修改同步器状态。
// 设置独占锁所占用的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 判断当前线程是否等于互斥锁占有者。如果是,表示当前锁是被自己占有的,可以进行重入操作。
else if (current == getExclusiveOwnerThread()) {
// 这里可以看到加上acquires。并不是设置为1。所以,当我们进行多次lock()操作的时候,也要进行对应次数的unlock()
// 否则锁无法成功被释放。
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

我们看一下这个方法​​hasQueuedPredecessors​​:

hasQueuedPredecessors()

public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 判断队列头尾是否相同。 下面图示讨论了h.next为null的情况,如果不为null,则判断头结点的下一个节点
// 线程是否是自己,如果不是。则需要进行排队。
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

【AQS】ReentrantLock可重入锁源码解析_结点

这个方法也是与非公平锁的区别之一,即在获取锁的时候,如果当前锁没有被占有,线程不会判断队列中是否有等待者,而是直接先尝试CAS获取锁。下面我们看看两者的区别。

【AQS】ReentrantLock可重入锁源码解析_java_02

总结:【非公平地方有两处】
1. 在lock()方法里,公平锁直接调用AQS的acquire()方法,而非公平锁先尝试CAS获取锁,如失败,再调用acquire方法
2. 在acquire方法中,会调用实现类的tryAcquire方法,而公平锁或调用hasQueuedPredecessors()方法判断队列中是否有线程在等待,而非公平锁会直接使用CAS尝试获取锁。如果失败,则进行入队操作。

上面我们看完了非公平锁NonFair中的​​tryAcquire()​​​方法,我们继续回到AQS中的​​acquire()​​方法

public final void acquire(int arg) {
//获取锁失败,则需要进行入队。addWaiter()
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

如果此时tryAcquire(arg)返回成功,即获取到了锁。则执行结束。表示获取锁成功。

如果此时tryAcquire(arg)返回失败,表示获取锁失败了,则会执行acquireQueued方法。我们先看addWaiter()方法,该方法就是线程节点入队方法。

addWaiter()

private Node addWaiter(Node mode) {
// 构造一个新节点节点。waiter为当前线程,mode为独占类型[Node.EXCLUSIVE]。
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;//尾结点
// 如果尾结点不为null,则表示有节点在队列中,则可以直接使用CAS进行入队尝试
if (pred != null) {
//节点入队操作是非原子性的。分为一下3步。
// 1.节点的prev指针执行尾结点。
// 2. 设置当前节点为尾结点
// 3. 设置原为节点的next指针指向当前节点
node.prev = pred;//step1
if (compareAndSetTail(pred, node)) {//step2
pred.next = node;//step3
return node; // 如果入队成功,返回当前线程构建的节点。
}
}
// 如果尾结点为null,或者是第一次尝试入队失败了,则需要进入enq方法、
enq(node);// 入队(尾插法)
return node;//返回当前线程构建的节点
}

具体解析看注释就行了,下面我们看看enq方法:

private Node enq(final Node node) {
// 自旋[死循环] 保证节点100%入队成功。
for (;;) {
Node t = tail;
if (t == null) {
// 如果队列此时为null,则构造一个空节点,并将头尾指针指向这个空节点
if (compareAndSetHead(new Node())) // 使用CAS设置new Node()节点为头节点。
tail = head;//头尾节点设置为一个
} else {
node.prev = t;// 1.先设置前置节点的头结点为尾结点
if (compareAndSetTail(t, node)) {//2.然后使用CAS设置tail尾结点尾当前节点。
t.next = node; // 3. 最后设置原尾结点的下一个节点为当前节点
return t;//退出自旋
}
}
}
}

通过addWaiter()方法,我们就把节点成功入队了。下面看看acquireQueued()方法。

acquireQueued()

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;// 表示是否成功拿到资源
try {
boolean interrupted = false;// 标记等待过程中是否被中断过
// 自旋
for (;;) {
// 获取当前节点的前一个节点。
final Node p = node.predecessor();
// 判断当前节点的前一个节点为头结点。如果是,则执行tryAcquire 尝试抢占锁。
// 如果抢占锁成功。则设置当前节点为头结点。剔除原有的头结点。
// [需要将当前节点设置为头结点,原head节点出队操作]
if (p == head && tryAcquire(arg)) {
setHead(node);//设置当前节点为头结点。并将thread设置null,prev设置为null
p.next = null; // help GC
failed = false;// 成功获取资源
return interrupted;// 返回等待过程中是否被中断过。
}
// 如果当前节点不是第一个或者抢占锁失败,则执行shouldParkAfterFailedAcquire
// 如果自己可以休息了,就通过park()进入waiting状态,知道被unpack();如果不可中断的情况被中断了。
// 那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// parkAndCheckInterrupt()返回true,病史线程在阻塞过程中,被中断过。
// 但是此时只是表示当前线程被唤醒了,即un
interrupted = true;//如果等待过程中被中断过,就将interrupted标记为true
}
} finally {
if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
cancelAcquire(node);
}
}

这里我们看看这个方法​​shouldParkAfterFailedAcquire​​; 从名字我们可以知道,即获取锁失败之后应该要park;

shouldParkAfterFailedAcquire()

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;// 获取node前驱节点的状态waitStatus;
if (ws == Node.SIGNAL) // 当节点的前一个节点的waitStatus为signal时,返回true。SIGNAL=-1
// 已经告诉前驱拿完号后通知自己了,那么就可以安心Park了
return true;
if (ws > 0) {
//cancelled大于0.表示节点已经取消了,则需要去除取消的节点。[异常情况或主动中断情况会修改为cancelled]
/**
* 如果前驱节点取消了,那就一直往前找。直到找到最近一个正常等待的状态,并排在它的后面。
* 注意:那些取消的节点,由于被自己“加塞”到它们前面了,它们相当于形成了一个无引用链,稍后会被GC回收。
*/
do {
// 从后往前,把所有取消的节点全部断链
// Node pred = pred.prev;
// node.prev = pred;
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;//连接成一个新的链
} else {
// 此时在ReentrantLock中,waitStatus状态只有0,表示我需要去标注上一个节点。
// 但是不能立即Park(). 调用者需要重新去确认在park()前不能获取到锁、
// 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完后通知自己一下。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//把节点状态修改为-1
}
return false;
}

park()自己,并在别唤醒的时候,检查阻塞过程中是否中断过、

parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
// 阻塞线程。下次被unPark()时,会从这里继续执行。
// 返回Thread.interrupted(),返回线程中断状态。如果线程阻塞过程中,被中断过,返回true,并且会将中断标记清除。
LockSupport.park(this);//挂起线程,下次会继续从此次执行。
// interrupted()是静态方法:内部实现是调用当前线程的isInterrupted().并且会重置当前线程的中断状态。
// isInterrupted()是实例方法,是调用该方法的对象锁表示的那个线程的isInterrupted(),不会重置当前线程的中断状态。
return Thread.interrupted();//返回线程是否被中断,并且会将中断状态复位[归零]。
}

被唤醒之后,别唤醒并不代表你一定可以获取到锁。所以已经需要重复之前CAS抢锁过程,如果失败,需要再次park();

如果此时获取到了锁,并且被中断过,则在acquire方法中再次中断自己。即打一个中断标记。让调用这自己考虑该如何处理。

public final void acquire(int arg) {
//获取锁失败,则需要进行入队。addWaiter()
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();//中断当前线程。
}

当然,我们可以看到有一个finally块,如果是失败的情况下,需要执行cancelAcquire方法。

正常的acquire方法是不会失败的。只有设置了等待时间。或者中断可以抛异常的情况,会失败。

lockInterruptibly()

public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//判断当前线程是否已经被中断了,如果已经中断,则抛出中断异常InterruptedException;
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 当前线程如果已被中断,则抛出中断异常。
}
} finally {
if (failed)
cancelAcquire(node);
}
}

tryLock(long timeout, TimeUnit unit)

这个方法既可以设置阻塞时间,也可以响应中断异常。

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout; // 计算好deadline时间
final Node node = addWaiter(Node.EXCLUSIVE);// 节点入队
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 超过等待时间,并且还是为获取到锁,则返回false;
return false;
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);//挂起当前线程nanosTimeout纳秒;
if (Thread.interrupted()) // 判断当前线程是否挂起。
throw new InterruptedException();// 抛出中断异常
}
} finally {
if (failed)
// return false. 和 throw new InterruptedException 会维持failed为true。即
// 都是属于异常情况,表示当前线程获取锁异常,不再等待抢占锁,
// 所以将当前线程封装的节点的waitStatus设置为CANCELLED;
cancelAcquire(node);
}
}

以下就是在发生阻塞超时和被中断的情况下,需要取消掉当前节点。

private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null; // 第一步:清空持有的线程

// 将节点断开,从当前节点往前,去掉前面所有的取消节点。
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// pred = pred.prev;
// node.prev = pred;

//记录node前驱节点的next指针指向。
Node predNext = pred.next;

// 设置节点的waitStatus为CANCELLED。
node.waitStatus = Node.CANCELLED;

// 判断node节点是否为尾结点。如果是尾结点,则将node的前驱节点设置为尾结点。
// 被唤醒的都是头结点的下一个节点,如果此时节点又是尾结点,此时表示后面没有节点。直接想当前节点从队列中去除即可。
if (node == tail && compareAndSetTail(node, pred)) {
// CAS设置新的tail节点的next指针设置为null;
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// pred 被取消的节点的前驱节点。
// node 被取消的节点
// pred节点不为head && (前驱节点waitStatus不为SIGNA或者设置为SIGNAL成功) && pred所持有的线程不为null;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next; // node节点的next指针。
if (next != null && next.waitStatus <= 0)
//设置node.next为pred.next; 即将node节点断链,从队列中移除。
compareAndSetNext(pred, predNext, next);
} else {
// 如果pred节点为头结点。或者pred节点的状态不为-1并且设置waitStatus为-1失败。
// 或者pred节点的thread为null。
// 则唤醒node节点的下一个节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}

unparkSuccessor()

下面就是唤醒节点的代码了。

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 如果头结点的状态不为0,则需要更新为0;
// 为什么需要将waitStatus修改为0.因为在唤醒之后,在非公平锁的情况下,被唤醒的节点可能还是无法
// 抢占到锁的,所以有可能重新park(); 那我们看看park()的条件
// if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt())
// shouldParkAfterFailedAcquire{
// compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//把节点状态修改为-1
//}
// 在park前,需要将waitState修改为-1;即只能park-1的节点。
// 在ReentrantLock下。此时ws只能为0.才会走到上面的语句 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//获取头结点的下一个节点。
if (s == null || s.waitStatus > 0) {
// 但是遇到s==null时,表示我们可能遇到了一种中间状态,即next节点还没有连接好,所以我们可以从后往前找
// 如果s节点已经取消。则需要断链。
s = null;
// 倒序遍历到node后的最近一个不为CANCALLED的节点。
/**
* 重点:为什么需要从后往前遍历节点。
* 我们可以看到下面。node节点入队的时候。并不是原子性的。
* 即:当我们执行到step2的时候,此时tail节点是指向了后一个节点。
* 但是前驱节点还没有指向node,如果此时unpark()执行,从前往后执行,会导致这个节点
* 无法遍历到,即无法被唤醒、
*private Node addWaiter(Node mode) {
* // 构造一个节点。waiter为当前线程,mode为排它锁类型[Node.EXCLUSIVE]。
* Node node = new Node(Thread.currentThread(), mode);
* // Try the fast path of enq; backup to full enq on failure
* Node pred = tail;//尾结点
* if (pred != null) {
* //前置节点设置不是同步的,
* node.prev = pred;//step1
* // CAS设置当前节点为尾结点
* if (compareAndSetTail(pred, node)) {//step2
* pred.next = node;//step3
* return node;
* }
* }
* enq(node);// 入队(尾插法)
* return node;//返回当前线程构建的节点
* }
*/
// node节点是头结点,从后往前遍历,找到最后一个,即从左往右第一个waitStatus<=0的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 用unpark()唤醒等待队列中最前边的那个未取消线程
LockSupport.unpark(s.thread);// 唤醒s节点锁持有的线程[即唤醒最前面一个有效的节点]
}

release方法:

public final boolean release(int arg) {
if (tryRelease(arg)) { // 清空锁的占有线程,并将state恢复为0;
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒不为0的节点。即非0表示后一个节点是可以被唤醒的。
unparkSuccessor(h);
return true;
}
return false;
}

到此,就把ReentrantLock的lock()方法和unlock()方法看完了。菜鸟刚研究AQS源码,还有很多不懂的。望个人仁兄指教。


举报

相关推荐

0 条评论