0
点赞
收藏
分享

微信扫一扫

AQS如何保证锁并发安全

Jonescy 2022-01-09 阅读 83

目录

什么是AQS?

AQS哪些过程会存在线程安全风险?

AQS如何保证线程安全?


前言,关于AQS的源码介绍有很多参考,但是回归本源,为什么要设计AQS,它为实现解决并发的各类锁提供了基础,那它又是怎么处理和保证锁并发安全的呢,这里简要并深究的分析下其实现方式。内容供参考,欢迎领导和同志们指导和提出意见。(关于并发编程,个人推荐一个很优秀且教学清晰的up主:寒食君)

什么是AQS?

还是简要总结一下,AbstractQueuedSynchronizer(AQS)!类如其名,抽象的队列式的同步器。它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向链表。基于CAS实现锁操作和并发安全

AQS哪些过程会存在线程安全风险?

这里考虑一般的对竞争资源获取的情况,主要有哪些步骤?

1.争抢锁,必须保证有且只有一个线程可以获取锁。

2.未获取锁的线程要准确无误的进入队列等待及决定是否安全挂起。

3.获取锁的线程要可以安全的释放锁,并唤醒一个等待队列的线程。

AQS如何保证线程安全?

下面结合源码(ReentrantLock中的非公平锁NonfairSync )对上面这些过程进行分析:

争抢锁,必须保证有且只有一个线程可以获取锁:关键词 state 变量

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    ...

   /** 尝试获取锁,不成功则加入等待队列
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
   public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 争抢锁资源,非公平锁实现方法如下:nonfairTryAcquire()
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    ...

        /**争抢锁资源,通过cas操作获取锁
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

    ...
}

主要过程为 205 行,通过使用compareAndSetState原子操作,保证有且只有一个线程可以加锁成功。

未获取锁的线程要准确无误的进入队列等待及决定是否安全挂起:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    ...

   /** 尝试获取锁,不成功则加入等待队列
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 加入等待队列,等待获取锁
            selfInterrupt();
    }

    ...

    /**让线程有序安全的进入等待队列,保证线程安全进入队列
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
611         if (compareAndSetTail(pred, node)) { // 原子操作入队
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
   
   ...

   /** 加入队列会存在失败的情况,故自旋直至加入成功为止
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
   private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    ...

    /**在队列中等待获取锁资源,准备挂起,并在唤醒后继续尝试获取锁,自旋直至成功
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    ...
  
    /**判断当前线程是否应该挂起 (这里也比较重要,这里保证了挂起的线程会被获取锁的线程唤醒)
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /* 这里也是一个关键步骤,阻塞线程会设置前置节点的waitStatus标识为SIGNAL,并在设置成 
             * 功后,才进行挂起。前置节点的线程释放锁后,会根据此值判断是否需要唤醒下一个线程。
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

主要过程为 addWaiter()方法 611 行以及enq()方法,通过使用compareAndSetTail,compareAndSetHead原子操作,保证有序安全入队。

获取锁的线程可以安全的释放锁,并唤醒一个等待队列的线程:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    ...

    /**释放锁,并在完全释放锁(锁可重入)后,唤醒等待队列的线程
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
1263    if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  
    ...

    /**唤醒可能挂起的等待队列的下一个节点线程
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    ...
}

注意到release()方法中 1263 行,会对节点的waitStatus标识进行判断,若!=0,则尝试唤醒下一节点线程。如入队过程中 shouldParkAfterFailedAcquire()方法中所示,下一节点挂起时,会在修改前置节点的waitStatus状态成功后才会进行挂起。故若下一节点挂起,则可以保证进行唤醒操作。

那么这样就是安全可靠的吗?这里并没有使用类似compareAndSetPark操作,考虑上面的情况:若是shouldParkAfterFailedAcquire之后,该线程还未挂起,但是前一个节点已经调用唤醒该线程的unpark操作,随后该线程执行park操作,那么是不是该线程便无人可以唤醒了?

这里需要了解下park与unpark,可参考附录链接;park与unpark操作是基于_counter(计数器)、 _mutex(互斥量)、_cond(条件变量)3个角色实现。简单来说,unpark操作将_counter设置为1,会唤醒线程。park操作将_counter设置为0,且判断是否_counter原值为1,若是则不会阻塞线程。大概类似于先吃后付款,先付款后吃都要给饭吃,不能白白unpark。

AQS主要是通过CAS操作保证线程安全,并有一套完善的逻辑保证资源的获取和释放。

附言:AQS的知识点和设计巧妙之处有很多,这里只是大致的分析了AQS如何实现锁及保证线程安全,还有许多知识需要仔细分析,比如 AQS 唤醒时为何从尾部开始查找?(线程安全问题)新建第一个阻塞队列节点时为什么加入一个空的线程头节点?(哨兵节点)ReentrantLock非公平锁与公平锁的区别?(实际上非公平指的是唤醒的线程与新线程抢锁的不公平,队列内的线程算是公平的)

休息一波开个盲盒(谨慎点击):

http://p6.itc.cn/images01/20201108/b9b0d238a3d145a18e81a51298a0c248.gif

参考链接地址:

AQS这样学就很简单了 - 知乎

Java并发编程——LockSupport的park和unpark - 简书

LockSupport的park/unpark分析_u013978512的博客-CSDN博客

Java AQS unparkSuccessor 方法中for循环为什么是从tail开始而不是head_womeiyouzaihuashui的博客-CSDN博客_unparksuccessor为什么从尾部

https://blog.51cto.com/u_15127639/2873074

举报

相关推荐

0 条评论