AbstractQueuedSynchronizer
AQS全程为 AbstractQueuedSynchronizer、抽象队列同步器。是j.u.c包下的一个抽象类,同时也是juc下并发同步工具的基础。像ReentrantLock,Semaphore,CountDownLatch都是基于AQS实现的。
CAS
CAS全程为 Compare And Set 或 Compare And Swap。即比较并交换。是AQS实现的基础。CAS在实现的时候,有三个值。一个是内存值 V,一个是期望值E,一个是更新值 U,当且仅当 V = E的时候,才会叫V更新为U。CAS 底层依赖与汇编指令 cmpxchg. 这是一个CPU级别的指令,下面我们看看Linux和Windows下的c代码
我们可以通过OpenJdk源码看到如何定义的【jdk版本jdk-b64】
JDK7的源码 Linux CAS
linux_x86版本C++实现
\hotspot\src\os_cpu\linux_x86\vm\atomic_linux_x86.inline.hpp
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
Windows版本C++实现
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint*  dest, jint compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}首先会有一个方法 os::is_MP, 判断是否为多核的【multiple processor】。如果是多核,则需要使用lock指令。即 使用 lock cmpxchg指令来执行CAS操作,而如果是单核的则直接使用 cmpxchg指令即可。lock 指令在执行的后面的指令的时候总线,后面优化成锁缓存。
用处:在Java中Unsafe类中定义了很多CAS操作,都是Native方法,Native方法即本地方法,调用的是JDK中的C++代码。具体实现在上面。而AQS中大量使用到了CAS操作。
ReentrantLock
这里通过ReentrantLock来学习AQS底层源码!
@Test
public void test(){
ReentrantLock lock = new ReentrantLock();
lock.lock();
try{
//....
}finally{
lock.unlock();
}
}
上面创建了ReentrantLock可重入锁。我们先看公平锁:
FairSync 中的 lock()
final void lock() {
    this.acquire(1);
}调用acquire方法,这个方法是定义在AQS中的。
acquire()
public final void acquire(int var1) {
    // tryAcquire()尝试获取锁,如果获取锁不成功,则会执行后面的acquireQueued方法。
    if (!this.tryAcquire(var1) 
        && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), var1)) {
        selfInterrupt();
    }
}下面我们先看看tryAcquire 方法,会调用实现来自己的方法,即公平锁中定义的tryAcquire方法
tryAcquire()
protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取AQS的状态
    int c = getState();
    // 0表示未被占有。即处于空闲状态,可以进行获取锁。
    if (c == 0) {
        // hasQueuedPredecessors  判断当前队列中是否有线程正在排队。
        // !hasQueuedPredecessors() 与非公平的区别,需要判断等待队列中是否存在其他等待节点,如果存在,则返回true。
        // 当hasQueuedPredecessors()返回true是,表示队列中有等待的节点。此时整个tryAcquire方法返回false,表示
        // 获取锁失败,如果返回false,表示等待的节点,则表示可以使用CAS获取锁,如果获取成功,则设置独占锁的线程为
        // 当前线程,如果CAS失败,则返回false. 进行入队。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {// compareAndSetState(0, acquires)抢锁。使用CAS修改同步器状态。
            // 设置独占锁所占用的线程为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 判断当前线程是否等于互斥锁占有者。如果是,表示当前锁是被自己占有的,可以进行重入操作。
    else if (current == getExclusiveOwnerThread()) {
        // 这里可以看到加上acquires。并不是设置为1。所以,当我们进行多次lock()操作的时候,也要进行对应次数的unlock()
        // 否则锁无法成功被释放。
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}我们看一下这个方法hasQueuedPredecessors:
hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // 判断队列头尾是否相同。 下面图示讨论了h.next为null的情况,如果不为null,则判断头结点的下一个节点
    // 线程是否是自己,如果不是。则需要进行排队。
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
这个方法也是与非公平锁的区别之一,即在获取锁的时候,如果当前锁没有被占有,线程不会判断队列中是否有等待者,而是直接先尝试CAS获取锁。下面我们看看两者的区别。

总结:【非公平地方有两处】
1. 在lock()方法里,公平锁直接调用AQS的acquire()方法,而非公平锁先尝试CAS获取锁,如失败,再调用acquire方法
2. 在acquire方法中,会调用实现类的tryAcquire方法,而公平锁或调用hasQueuedPredecessors()方法判断队列中是否有线程在等待,而非公平锁会直接使用CAS尝试获取锁。如果失败,则进行入队操作。
上面我们看完了非公平锁NonFair中的tryAcquire()方法,我们继续回到AQS中的acquire()方法
public final void acquire(int arg) {
    //获取锁失败,则需要进行入队。addWaiter()
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}如果此时tryAcquire(arg)返回成功,即获取到了锁。则执行结束。表示获取锁成功。
如果此时tryAcquire(arg)返回失败,表示获取锁失败了,则会执行acquireQueued方法。我们先看addWaiter()方法,该方法就是线程节点入队方法。
addWaiter()
private Node addWaiter(Node mode) {
    // 构造一个新节点节点。waiter为当前线程,mode为独占类型[Node.EXCLUSIVE]。
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;//尾结点
    // 如果尾结点不为null,则表示有节点在队列中,则可以直接使用CAS进行入队尝试
    if (pred != null) {
        //节点入队操作是非原子性的。分为一下3步。
        // 1.节点的prev指针执行尾结点。
        // 2. 设置当前节点为尾结点
        // 3. 设置原为节点的next指针指向当前节点
        node.prev = pred;//step1
        if (compareAndSetTail(pred, node)) {//step2
            pred.next = node;//step3
            return node; // 如果入队成功,返回当前线程构建的节点。
        }
    }
    // 如果尾结点为null,或者是第一次尝试入队失败了,则需要进入enq方法、
    enq(node);// 入队(尾插法)
    return node;//返回当前线程构建的节点
}具体解析看注释就行了,下面我们看看enq方法:
private Node enq(final Node node) {
    // 自旋[死循环] 保证节点100%入队成功。
    for (;;) {
        Node t = tail;
        if (t == null) {
            // 如果队列此时为null,则构造一个空节点,并将头尾指针指向这个空节点
            if (compareAndSetHead(new Node())) // 使用CAS设置new Node()节点为头节点。
                tail = head;//头尾节点设置为一个
        } else {
            node.prev = t;// 1.先设置前置节点的头结点为尾结点
            if (compareAndSetTail(t, node)) {//2.然后使用CAS设置tail尾结点尾当前节点。
                t.next = node; // 3. 最后设置原尾结点的下一个节点为当前节点
                return t;//退出自旋
            }
        }
    }
}通过addWaiter()方法,我们就把节点成功入队了。下面看看acquireQueued()方法。
acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;// 表示是否成功拿到资源
    try {
        boolean interrupted = false;// 标记等待过程中是否被中断过
        // 自旋
        for (;;) {
            // 获取当前节点的前一个节点。
            final Node p = node.predecessor();
            // 判断当前节点的前一个节点为头结点。如果是,则执行tryAcquire 尝试抢占锁。
            // 如果抢占锁成功。则设置当前节点为头结点。剔除原有的头结点。
            // [需要将当前节点设置为头结点,原head节点出队操作]
            if (p == head && tryAcquire(arg)) {
                setHead(node);//设置当前节点为头结点。并将thread设置null,prev设置为null
                p.next = null; // help GC
                failed = false;// 成功获取资源
                return interrupted;// 返回等待过程中是否被中断过。
            }
            // 如果当前节点不是第一个或者抢占锁失败,则执行shouldParkAfterFailedAcquire
            // 如果自己可以休息了,就通过park()进入waiting状态,知道被unpack();如果不可中断的情况被中断了。
            // 那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // parkAndCheckInterrupt()返回true,病史线程在阻塞过程中,被中断过。
                // 但是此时只是表示当前线程被唤醒了,即un
                interrupted = true;//如果等待过程中被中断过,就将interrupted标记为true
        }
    } finally {
        if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
            cancelAcquire(node);
    }
}这里我们看看这个方法shouldParkAfterFailedAcquire; 从名字我们可以知道,即获取锁失败之后应该要park;
shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;// 获取node前驱节点的状态waitStatus;
    if (ws == Node.SIGNAL) // 当节点的前一个节点的waitStatus为signal时,返回true。SIGNAL=-1
        // 已经告诉前驱拿完号后通知自己了,那么就可以安心Park了
        return true;
    if (ws > 0) { 
        //cancelled大于0.表示节点已经取消了,则需要去除取消的节点。[异常情况或主动中断情况会修改为cancelled]
        /**
         * 如果前驱节点取消了,那就一直往前找。直到找到最近一个正常等待的状态,并排在它的后面。
         * 注意:那些取消的节点,由于被自己“加塞”到它们前面了,它们相当于形成了一个无引用链,稍后会被GC回收。
         */
        do {
            // 从后往前,把所有取消的节点全部断链
            // Node pred = pred.prev;
            // node.prev = pred;
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;//连接成一个新的链
    } else {
        // 此时在ReentrantLock中,waitStatus状态只有0,表示我需要去标注上一个节点。
        // 但是不能立即Park(). 调用者需要重新去确认在park()前不能获取到锁、
        // 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完后通知自己一下。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//把节点状态修改为-1
    }
    return false;
}park()自己,并在别唤醒的时候,检查阻塞过程中是否中断过、
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
    // 阻塞线程。下次被unPark()时,会从这里继续执行。
    // 返回Thread.interrupted(),返回线程中断状态。如果线程阻塞过程中,被中断过,返回true,并且会将中断标记清除。
    LockSupport.park(this);//挂起线程,下次会继续从此次执行。
    // interrupted()是静态方法:内部实现是调用当前线程的isInterrupted().并且会重置当前线程的中断状态。
    // isInterrupted()是实例方法,是调用该方法的对象锁表示的那个线程的isInterrupted(),不会重置当前线程的中断状态。
    return Thread.interrupted();//返回线程是否被中断,并且会将中断状态复位[归零]。
}被唤醒之后,别唤醒并不代表你一定可以获取到锁。所以已经需要重复之前CAS抢锁过程,如果失败,需要再次park();
如果此时获取到了锁,并且被中断过,则在acquire方法中再次中断自己。即打一个中断标记。让调用这自己考虑该如何处理。
public final void acquire(int arg) {
    //获取锁失败,则需要进行入队。addWaiter()
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
static void selfInterrupt() {
    Thread.currentThread().interrupt();//中断当前线程。
}当然,我们可以看到有一个finally块,如果是失败的情况下,需要执行cancelAcquire方法。
正常的acquire方法是不会失败的。只有设置了等待时间。或者中断可以抛异常的情况,会失败。
lockInterruptibly()
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//判断当前线程是否已经被中断了,如果已经中断,则抛出中断异常InterruptedException;
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 当前线程如果已被中断,则抛出中断异常。
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryLock(long timeout, TimeUnit unit)
这个方法既可以设置阻塞时间,也可以响应中断异常。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout; // 计算好deadline时间
    final Node node = addWaiter(Node.EXCLUSIVE);// 节点入队
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) // 超过等待时间,并且还是为获取到锁,则返回false;
                return false;
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);//挂起当前线程nanosTimeout纳秒;
            if (Thread.interrupted()) // 判断当前线程是否挂起。
                throw new InterruptedException();// 抛出中断异常
        }
    } finally {
        if (failed)
            // return false. 和 throw new InterruptedException 会维持failed为true。即
            // 都是属于异常情况,表示当前线程获取锁异常,不再等待抢占锁,
            // 所以将当前线程封装的节点的waitStatus设置为CANCELLED;
            cancelAcquire(node);
    }
}以下就是在发生阻塞超时和被中断的情况下,需要取消掉当前节点。
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null; // 第一步:清空持有的线程
    // 将节点断开,从当前节点往前,去掉前面所有的取消节点。
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // pred = pred.prev;
    // node.prev = pred;
    
    //记录node前驱节点的next指针指向。
    Node predNext = pred.next;
    
    // 设置节点的waitStatus为CANCELLED。
    node.waitStatus = Node.CANCELLED;
    // 判断node节点是否为尾结点。如果是尾结点,则将node的前驱节点设置为尾结点。
    // 被唤醒的都是头结点的下一个节点,如果此时节点又是尾结点,此时表示后面没有节点。直接想当前节点从队列中去除即可。
    if (node == tail && compareAndSetTail(node, pred)) {
        // CAS设置新的tail节点的next指针设置为null;
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // pred 被取消的节点的前驱节点。
        // node 被取消的节点
        // pred节点不为head && (前驱节点waitStatus不为SIGNA或者设置为SIGNAL成功) && pred所持有的线程不为null;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next; // node节点的next指针。
            if (next != null && next.waitStatus <= 0)
                //设置node.next为pred.next; 即将node节点断链,从队列中移除。
                compareAndSetNext(pred, predNext, next);
        } else {
            // 如果pred节点为头结点。或者pred节点的状态不为-1并且设置waitStatus为-1失败。 
            // 或者pred节点的thread为null。
            // 则唤醒node节点的下一个节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}unparkSuccessor()
下面就是唤醒节点的代码了。
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        // 如果头结点的状态不为0,则需要更新为0;
        // 为什么需要将waitStatus修改为0.因为在唤醒之后,在非公平锁的情况下,被唤醒的节点可能还是无法
        // 抢占到锁的,所以有可能重新park(); 那我们看看park()的条件
        //  if (shouldParkAfterFailedAcquire(p, node) &&
        //                    parkAndCheckInterrupt())
        // shouldParkAfterFailedAcquire{
        // compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//把节点状态修改为-1
        //}
        // 在park前,需要将waitState修改为-1;即只能park-1的节点。
        // 在ReentrantLock下。此时ws只能为0.才会走到上面的语句 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;//获取头结点的下一个节点。
    if (s == null || s.waitStatus > 0) {
        // 但是遇到s==null时,表示我们可能遇到了一种中间状态,即next节点还没有连接好,所以我们可以从后往前找
        // 如果s节点已经取消。则需要断链。
        s = null;
        // 倒序遍历到node后的最近一个不为CANCALLED的节点。
        /**
             * 重点:为什么需要从后往前遍历节点。
             * 我们可以看到下面。node节点入队的时候。并不是原子性的。
             * 即:当我们执行到step2的时候,此时tail节点是指向了后一个节点。
             * 但是前驱节点还没有指向node,如果此时unpark()执行,从前往后执行,会导致这个节点
             * 无法遍历到,即无法被唤醒、
             *private Node addWaiter(Node mode) {
             *         // 构造一个节点。waiter为当前线程,mode为排它锁类型[Node.EXCLUSIVE]。
             *         Node node = new Node(Thread.currentThread(), mode);
             *         // Try the fast path of enq; backup to full enq on failure
             *         Node pred = tail;//尾结点
             *         if (pred != null) {
             *          //前置节点设置不是同步的,
             *             node.prev = pred;//step1
             *             // CAS设置当前节点为尾结点
             *             if (compareAndSetTail(pred, node)) {//step2
             *                 pred.next = node;//step3
             *                 return node;
             *             }
             *         }
             *         enq(node);// 入队(尾插法)
             *         return node;//返回当前线程构建的节点
             *     }
             */
        // node节点是头结点,从后往前遍历,找到最后一个,即从左往右第一个waitStatus<=0的节点。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 用unpark()唤醒等待队列中最前边的那个未取消线程
        LockSupport.unpark(s.thread);// 唤醒s节点锁持有的线程[即唤醒最前面一个有效的节点]
}release方法:
public final boolean release(int arg) {
    if (tryRelease(arg)) { // 清空锁的占有线程,并将state恢复为0;
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒不为0的节点。即非0表示后一个节点是可以被唤醒的。
            unparkSuccessor(h);
        return true;
    }
    return false;
}到此,就把ReentrantLock的lock()方法和unlock()方法看完了。菜鸟刚研究AQS源码,还有很多不懂的。望个人仁兄指教。
                










