0
点赞
收藏
分享

微信扫一扫

浅析可重入锁的加锁释放锁过程

绪风 2022-04-23 阅读 57
java

一、AQS

AQSAbstractQueuedSynchronizer,抽象队列同步器,是juc的基础,如常用的ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLock等都是基于AQS来进行设计的。

AQS通过维护一个同步状态state和一个双向队列CLH来实现加锁释放锁。

CLH队列中存放的Node结点主要关注如下几个属性:

  • waitStatus:等待状态,取值分别为CANCELLED = 1SIGNAL = -1CONDITION = -2PROPAGATE = -3,以及初始时的0
  • prev:当前结点的前驱结点
  • next:当前结点的后继结点
  • thread:当前结点对应的线程

二、加锁

1、调用lock.lock()进行加锁,lock()方法内部调用sync.lock()方法进行加锁,非公平锁中,syncNonfairSync的实例,其具体加锁代码如下

final void lock() {
    // 自旋,设置AQS中同步状态 state 的值,如果 state 为0,表示当前没有线程持有锁,则将其设置为1
    if (compareAndSetState(0, 1))
        // 设置当前线程持有排他锁,加锁结束
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 如果自旋失败,则表示已经有线程持有锁,state 的值不为0(大于0)
        // 则通过排他模式获取锁,如果获取不成功,则进入队列
        acquire(1);
}

2、acquire方法,代码如下

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

2.1、该方法的重点在于if判断,首先是 tryAcquire,在子类NonfairSync中实际调用的是nonfairTryAcquire

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获取同步状态
    int c = getState();
    // 若同步状态为0,表示当前锁处于空闲状态
    if (c == 0) {
        // 尝试抢占,这里有并发的可能,所以有可能会抢不到
        if (compareAndSetState(0, acquires)) {
            // 如果抢到了,那就设置持有者为当前线程,并返回true
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果当前线程已经持有了锁,则重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 当前线程未持有锁
    return false;
}

通过上面代码上的注释分析,可以知道,tryAcquire如果返回true,则表示当前线程持有了锁,如果未持有锁,则进入acquireQueued

2.2、acquireQueued中首先会执行内层方法addWaiter(Node.EXCLUSIVE),将当前线程加入到等待队列队尾,Node.EXCLUSIVE表示排他模式,对应的还有共享模式

private Node addWaiter(Node mode) {
    // 构建当前线程对应的队列结点
    Node node = new Node(Thread.currentThread(), mode);
    // 获取队尾结点,如果是首次往等待队列中添加结点,则 tail 指向null
    Node pred = tail;
    if (pred != null) {
        // 如果队列不为空,则尾插法,将当前线程结点放到队尾,如果入队成功了,就返回这个结点
        // 这一步在存在并发的情况下可能会设置失败,那就会继续往下执行 enq 方法,通过自旋来将当前结点放置到队尾
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // 自旋操作
    for (;;) {
        // 队尾结点
        Node t = tail;
        // 如果队尾结点为空,则表示队列为空,需要进行初始化
        if (t == null) {
            // 注意这一步,初始化设置头结点的时候,并不是将入队的 node 结点放入队首,而是new了一个新的结点
            // 这时候,这个头结点其实是一个傀儡结点,或是哨兵结点 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 尾插法,将当前线程结点放入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

到了这里,可以知道,在执行了addWaiter之后,队列中有两个结点,头结点自然是傀儡结点,其属性中,waitStatus为0,thread为null,pre指向null,next指向当前队列中的第一个真正的等待结点。

第二个结点才是队列中第一个真正的结点,也是队尾结点,此时其waitStatus为0,pre指向头结点(傀儡结点),next指向null,thread为对应的线程。

2.3、内层方法addWaiter执行完后,执行外层方法acquireQueued,代码如下

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 返回node的前驱结点,如果node是第一个等待结点,则返回的是头结点(傀儡结点)
            final Node p = node.predecessor();
            /*
            如果node的前驱是头结点,则尝试获取锁,tryAcquire方法在上面已经分析过了
            tryAcquire会尝试获取锁,但不一定会获取到,因为是非公平锁,
            所以在尝试获取锁的时候,可能会被其他线程抢占
            */
            if (p == head && tryAcquire(arg)) {
                // 如果当前线程抢到了锁,则将当前线程对应的node设置为新的傀儡结点
                /*
                private void setHead(Node node) {
                    head = node;
                    node.thread = null;
                    node.prev = null;
                }
                */
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /*
            如果当前结点的前驱不是头结点或没有抢到锁则进入if判断
            shouldParkAfterFailedAcquire方法一看名字就知道作用了,用于判断是否应该在获取锁失败后将线程挂起
            shouldParkAfterFailedAcquire如果返回true,说明当前线程需要被挂起
            通过parkAndCheckInterrupt挂起当前线程,直到线程被唤醒后返回其中断状态
            */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // 状态大于0,就只有一个取值,也就是CANCELLED,前驱结点取消等待,那就将其从队列中移除
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将前驱结点的状态设置为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire的源码来看,队列中除了最后一个结点的waitStatus是0外(初始状态为0),其余结点的waitStatus都不是0。

private final boolean parkAndCheckInterrupt() {
    // 挂起当前线程,挂起后,return不会执行,直到等到unpark后才会继续执行
    LockSupport.park(this);
    // 唤醒后,才会继续执行return语句
    return Thread.interrupted();
}

看到LockSupport就不得不提一下java中线程等待唤醒的几种方法

  • 使用Objectwait()方法让线程等待,使用notify()notifyAll()进行唤醒
  • 使用JUC包中Conditionawait()方法让线程等待,使用signal()方法唤醒线程
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
  • 使用LockSupportpark()让线程等待,使用unpark()进行唤醒

前两种方法都是必须在锁里才能执行,synchronized里面执行waitnotify,在lock里面执行awaitsignal,如果没有锁,会报错,而且对加锁和解锁的顺序有要求,如果先解锁后加锁会导致卡死。

LockSupport俗称锁中断,它不需要持有锁也可以进行加锁解锁,而且加锁解锁先后顺序无要求,可以先解锁后加锁也不会导致卡死。这里只是简单提一下,具体的还是得实操。

回到正题,通过acquireQueued代码段中的注释分析可以知道,当node结点对应线程进入此方法后,如果其前驱是头结点(傀儡结点)则它会尝试获取锁,获取到锁之后它就变成了新的头结点。

而如果其前驱不是头结点,或者前驱是头结点但没有抢占到锁,则它会被挂起,直到持有锁的线程释放锁后,并且其满足被唤醒条件后才会被唤醒(见后面的unlock分析)又或者被中断,直到其成功抢到锁。

三、解锁

public void unlock() {
    // 如果当前线程持有锁,则将持有数量-1,为0后释放锁
    sync.release(1);
}

release代码如下

public final boolean release(int arg) {
    // 尝试释放锁,因为是可重入锁,所以锁的同步状态state可能会大于1
    // 所以tryRelease执行后当前线程可能依旧持有锁
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒头结点的后继结点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease执行的是ReentrantLock.Sync#tryRelease,代码如下

protected final boolean tryRelease(int releases) {
    // getState获取锁的状态,或者说加锁次数
    int c = getState() - releases;
    // 如果持有锁的不是当前线程,则抛出异常,即不允许释放其他线程的锁
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 锁的释放状态
    boolean free = false;
    if (c == 0) {
        // 如果加锁次数为0,就表示锁已经完全被释放了
        free = true;
        // 没有线程持有锁
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

release中,如果锁被完全释放了,则会调用unparkSuccessor来唤醒头结点的后继结点对应的线程,唤醒后,就回到了上面分析过的parkAndCheckInterrupt中的返回语句继续争抢锁。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒头结点的后继结点对应线程
        LockSupport.unpark(s.thread);
}

整个流程大致就是这样子,不得不感叹,设计AQS的几位大佬真的是了不起。

举报

相关推荐

0 条评论