0
点赞
收藏
分享

微信扫一扫

结合ReetrantLock理解AQS

南柯Taylor 2021-09-24 阅读 74

AQS简介


Java中Juc包如图所示,基于volatile和CAS搭建起Juc包,其中AQS是其中很重要的抽象框架,因为上层的许多同步工具类都是基于AQS实现的,如ReetrantLock、CountDownLatch、ReadWriteLock、Condition等等,如果在Java内置的同步工具类不能满足你的开发需求,你还可以根据业务场景自定义同步工具类。AQS是同步框架的抽象,维护着共享资源state的访问,并内置一个同步等待队列,在资源不足时,使线程进入队列等待。AQS中封装了出入队的细节,而上层代码只需要定义资源的获取和释放方式

四个重要的方法

根据资源的是否可共享性,可以有选择性地实现独占的tryAcquire、tryRelease,或者实现共享的tryAcquireShared、tryReleaseShared方法。甚至ReadWriteLock实现了两种资源占用方式。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

ReetrantLock源码

之所以通过ReetrantLock来切入AQS,是因为AQS是一个比较抽象的框架,而ReetrantLock的lock()和unlock()方法是我们经常调用的,通过ReetrantLock能够更加容易理解AQS。

lock()

lock()方法利用CAS将state从0置为1,CAS原子操作,保证多线程环境下只有一个线程可以执行成功,然后该线程调用setExclusiveOwnerThread设置独占线程,这是为了线程的可重入而设置的。而其他线程没有设置成功,就会进入aquire方法

       final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
aquire()

aquire()方法是AQS中定义的方法,用final修饰,不可重写。这个方法先是尝试获取资源,成功的话就返回。否则调用addWaiter初始化Node,并设置为独占模式并入队,acquireQueued方法则是将当前线程结点的前继结点状态修改为SIGNAL,表明当前继结点获取到资源时,会通知后继者,将其唤醒。同时acquireQueued会返回是否中断的标识。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
tryAquire

如果看AQS的源码,我们会发现这个方法直接抛出异常,是因为这个方法是留给子类来实现的。那么它为什么不直接定义为抽象方法呢?是因为如果定义为抽象方法,那么独占和共享模式都必须重写tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared等方法。对于只需要实现一种模式的子类来说没有必要。我们来看看ReetrantLock中非公平锁的tryAquire方法。
通过getState方法获取资源,如果c为0,说明资源可获取,就用CAS获取,并设置独占线程,返回成功true。如果没有资源了,判断当前线程是否是资源的独占者,如果是,就把state更新。

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
addWaiter

这个方法,目的就是把获取不到资源的线程进入到同步队列进行等待。如果当前在队列已经初始化过了,直接用CAS更新tail结点,更新指针。要不然调用enq方法入队,enq方法其实就是初始化头结点,再把当前节点插入。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
acquireQueued

入队之后,AQS在调用park阻塞自身之前还先看前继是不是头结点,如果前继是头结点,说明如果资源释放了,就先轮到head节点,于是就调用tryAcquire尝试获取,如果获取成功就把head节点释放,当前节点设置为head节点,然后返回。如果前继不是头结点,获取头结点没有获取到资源,就调用shouldParkAfterFailedAcquire方法。shouldParkAfterFailedAcquire方法主要是检查前继结点状态是否为SIGNAL,如果不是,就用CAS设置为SIGNAL。这样本线程结点就可以调用parkAndCheckInterrupt中的LockSupport.park(this);将自己阻塞,注意这里的阻塞跟synchronized不同,park调用会使线程变为WAITING状态,等待唤醒unpark由于是在for循环中调用的,因此这个方法会使CPU一直处于空忙。

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

unlock()

unlock释放锁

    public void unlock() {
        sync.release(1);
    }
release

release是AQS定义的释放独占锁的方式,由于锁是独占的,那么释放独占锁也不存在线程竞争,调用子类定义的tryRelease释放资源,然后获取头结点,将头结点的后继结点唤醒。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
tryRelease
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
举报

相关推荐

0 条评论