0
点赞
收藏
分享

微信扫一扫

【JUC系列】LOCK框架系列之八 核心锁类之StampedLock

快乐小鱼儿_9911 2022-05-01 阅读 19
java

StampedLock

文章目录

简介

ReentrantReadWriteLock 在沒有任何读写锁时,才可以取得写入锁,这可用于实现了悲观读(Pessimistic Reading),即如果执行中进行读取时,经常可能有另一执行要写入的需求,为了保持同步,ReentrantReadWriteLock 的读取锁定就可派上用场。然而,如果读取执行情况很多,写入很少的情况下,使用 ReentrantReadWriteLock 可能会使写入线程遭遇饥饿(Starvation)问题,也就是写线程迟迟无法竞争到锁定而一直处于等待状态。StampedLock可以环境这个问题。

  1. 它是java8在java.util.concurrent.locks新增的一个API。

  2. 它是一种基于性能的锁,具有三种用于控制读/写访问的模式。三种模式是:

    • Writing锁

      writeLock方法可能会阻塞等待独占访问,返回可以在unlockWrite方法中用于释放锁的标记。还提供了不定时和定时版本的 tryWriteLock。当锁保持在写模式时,不可能获得读锁,并且所有乐观读验证将失败。

    • Reading锁

      readLock 方法可能会阻塞等待非独占访问,返回可用于方法 unlockRead 以释放锁的标记。还提供了不定时和定时版本的 tryReadLock。

    • Optimistic Reading

      仅当锁当前未处于写入模式时,方法 tryOptimisticRead 才会返回非零标记。可以通过validate(long stamp)判断这个标记时候是否有写锁。

      这种模式可以被认为是一个非常弱的读锁版本,可以随时被写入者打破。 对简短且只读代码段使用乐观模式通常会减少争用并提高吞吐量。 然而,它的使用本质上是脆弱的。 乐观读取部分应该只读取字段并将它们保存在局部变量中以供验证后使用。 在乐观模式下读取的字段可能非常不一致,因此仅当您对数据表示足够熟悉以检查一致性和/或重复调用方法 validate() 时才适用。 例如,当首先读取对象或数组引用,然后访问其字段、元素或方法之一时,通常需要执行此类步骤。

  3. StampedLock 的状态state由版本和模式组成。

  4. 锁的获取方法返回一个标记,它代表和控制与锁状态相关的访问; 这些方法的“try”版本可能会返回特殊值零来表示无法获取访问权限。锁释放和转换方法需要标记作为参数,如果它们与锁的状态不匹配,则会失败。

  5. 此类还支持有条件地提供跨三种模式的转换的方法。 例如,方法 tryConvertToWriteLock 尝试“升级”模式,如果 (1) 已经处于写入模式 (2) 处于读取模式并且没有其他读取器或 (3) 处于乐观模式并且锁可用,则返回有效的写入标记 . 这些方法的形式旨在帮助减少在基于重试的设计中出现的一些代码膨胀。

  6. StampedLocks 旨在用作开发线程安全组件的内部实用程序。它们的使用依赖于对它们所保护的数据、对象和方法的内部属性的了解。它们不是可重入的,因此锁定的主体不应调用其他可能尝试重新获取锁的未知方法(尽管您可以将标记传递给可以使用或转换它的其他方法)。读锁模式的使用依赖于相关的代码段是无副作用的。未经验证的乐观读取部分不能调用未知的方法来容忍潜在的不一致。Stamps使用有限的表示,并且在密码学上不安全(即,有效的Stamps 可能是可猜测的)。Stamps 值可以在(不早于)连续运行一年后回收。超过此期限而未使用或验证的Stamps 可能无法正确验证。 StampedLocks 是可序列化的,但总是反序列化为初始解锁状态,因此它们对于远程锁定没有用处。

下面是java doc提供的StampedLock一个例子

import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.StampedLock;

public class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();

        void move(double deltaX, double deltaY) { // an exclusively locked method 写锁 独占锁
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }

        double distanceFromOrigin() { // A read-only method 读锁
            long stamp = sl.tryOptimisticRead();
            double currentX = x, currentY = y;
            // 锁的情况发生了变化,乐观锁要升级为悲观读了。
            if (!sl.validate(stamp)) {
                stamp = sl.readLock();
                try {
                    currentX = x;
                    currentY = y;
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }

        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) {
                    long ws = sl.tryConvertToWriteLock(stamp);
                    if (ws != 0L) {
                        stamp = ws;
                        x = newX;
                        y = newY;
                        break;
                    } else {
                        sl.unlockRead(stamp);
                        stamp = sl.writeLock();
                    }
                }
            } finally {
                sl.unlock(stamp);
            }
        }
}

实现分析

核心思想

如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。同样使用CLH列队进行锁实现。不同与AQS的是,如果是写请求或者悲观读请求,则在获取不到的情况下进行队列阻塞。每个写请求都是一个结点与读结点形成一个"读写相间链表“,而读与读之间是在"读写相间链表“上的读结点上新增一个读链表。具体可见数据结构上的图。

组成

构造函数

    public StampedLock() {
        // 初始化同步状态值
        state = ORIGIN;
    }

主要成员

state

锁的序列亦或状态

/** Lock sequence/state */
private transient volatile long state;

在这个类中,同步状态为采用了long型state,它拥有读锁,写锁,数据版本三个含义。

    /** The number of bits to use for reader count before overflowing */
    // 读锁占用的位数
    private static final int LG_READERS = 7;

    // Values for lock state and stamp operations
    // 一个读状态单位
    private static final long RUNIT = 1L;
    // 写状态标识位
    private static final long WBIT  = 1L << LG_READERS;
    // 读状态的标识位
    private static final long RBITS = WBIT - 1L;
    // 读锁的数量
    private static final long RFULL = RBITS - 1L;
    // 读锁和写锁的或运算---将写锁和读锁“相加”起来了-代表锁的全部情况
    private static final long ABITS = RBITS | WBIT;
    // 读锁状态的非运算???-用来检查读锁情况的辅助变量
    private static final long SBITS = ~RBITS; // note overlap with ABITS 注意与ABITS重叠
 
    // Initial value for lock state; avoid failure value zero
    // 状态的初始化
    private static final long ORIGIN = WBIT << 1;

根据上述代码可以发现,64位中,低7位代表读锁的情况。第8为代表写锁的情况。整个state本身值代表版本?

其二进制表示

            -------------------  -------------------  -------------------  -------------------
WBIT :      0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 1000 0000

RBITS :     0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0111 1111

RFULL :     0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0111 1110

ABITS :     0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 1111 1111

SBITS :     1111 1111 1111 1111  1111 1111 1111 1111  1111 1111 1111 1111  1111 1111 1000 0000

ORIGIN :    0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0000 0000  0000 0001 0000 0000

WNode

链表的结点类

    /** Wait nodes */
    static final class WNode {
        // 前驱结点
        volatile WNode prev;
        // 后继结点
        volatile WNode next;
        // 下一个读结点(读线程的列表)
        volatile WNode cowait;    // list of linked readers
        // 结点代表的线程
        volatile Thread thread;   // non-null while possibly parked
        // 结点状态 默认值0 等待中WAITING 已取消 CANCELLED
        volatile int status;      // 0, WAITING, or CANCELLED
        // 结点模式 读结点还是写结点
        final int mode;           // RMODE or WMODE
        // 有参构造函数
        WNode(int m, WNode p) { mode = m; prev = p; }
    }
whead和wtail

链表的头结点和尾结点

    /** Head of CLH queue */
    private transient volatile WNode whead;
    /** Tail (last) of CLH queue */
    private transient volatile WNode wtail;

主要方法

方法名描述
long writeLock()独占获取锁,如果获取资源失败会进入队列排队,阻塞直到获取到锁。不响应中断
long tryWriteLock()如果锁立即可用,则独占获取锁。不会阻塞。如果返回值为0代表不可以用。不响应中断
long tryWriteLock(long time, TimeUnit unit) throws InterruptedException如果在规定时间内获取到锁并且当前线程没有被中断,则独占获取锁。 超时和中断下的行为与为方法 tryLock(long,TimeUnit) 指定的行为相匹配。响应中断
long writeLockInterruptibly() throws InterruptedException独占获取锁,必要时阻塞,直到可用或当前线程被中断。 中断下的行为与为方法 lockInterruptibly() 指定的行为相匹配。响应中断
long readLock()非独占获取锁,必要时阻塞直到获取到锁。悲观读 不响应中断
long tryReadLock()如果锁立即可用,则非独占获取锁。自旋获取锁。如果返回值为0代表不可以用。不响应中断
long tryReadLock(long time, TimeUnit unit) throws InterruptedException如果在规定时间内获取到锁并且当前线程没有被中断,则非独占获取锁。 超时和中断下的行为与为方法 tryLock(long,TimeUnit) 指定的行为相匹配。响应中断
long readLockInterruptibly() throws InterruptedException非独占获取锁,必要时阻塞,直到可用或当前线程被中断。 中断下的行为与为方法 lockInterruptibly() 指定的行为相匹配。响应中断
long tryOptimisticRead()返回可以稍后验证的标记,如果存在独占锁,则返回零。
boolean validate(long stamp)如果参数标记发行以来尚未存在独占锁,则返回 true。 如果标记为零,则始终返回 false。 如果标记表示当前持有的锁,则始终返回 true。 使用不是从tryOptimisticRead获得的值或此锁的锁定方法调用此方法没有定义的效果或结果。
void unlockWrite(long stamp)如果锁状态与给定的标记匹配,则释放独占锁。
void unlockRead(long stamp)如果锁状态与给定的标记匹配,则释放非独占锁。
void unlock(long stamp)如果锁状态与给定的标记匹配,则释放相应的锁模式。
long tryConvertToWriteLock(long stamp)如果锁定状态与给定标记匹配,则执行以下操作之一。
1.标记表示持有写锁,则返回它。
2.标记表示持有读锁,如果写锁可用,则释放读锁并返回写锁的标记。
3.标记表示持有乐观读取,则仅在立即可用时才返回写锁的标记。
4.在所有其他情况下,此方法返回零。
long tryConvertToReadLock(long stamp)如果锁定状态与给定标记匹配,则执行以下操作之一。
1.标记表示持有写锁,则释放它并获得读锁。
2.标记表示持有读锁,则返回它。
3…标记表示持有读锁,则获取读取锁并仅在立即可用时返回读取标记。
4.在所有其他情况下,此方法返回零。
long tryConvertToOptimisticRead(long stamp)如果锁定状态与给定标记匹配,则
1…标记表示持有锁,则释放它并返回观察标记。
2.标记表示持有读锁,则在验证后返回。
3.此方法在所有其他情况下都返回零,因此可用作“tryUnlock”的一种形式。
boolean tryUnlockWrite()如果持有写锁,则不需要标记值进行释放写锁定。 此方法可能对错误后的恢复很有用。
boolean tryUnlockRead()如果持有读锁,则不需要标记值进行释放一个读锁定。 此方法可能对错误后的恢复很有用。
boolean isWriteLocked()锁是否是独占锁
boolean isReadLocked()锁是否是非独占锁
int getReadLockCount()查询该锁持有的读锁数。 此方法设计用于监视系统状态,而不是用于同步控制。
Lock asReadLock()返回此 StampedLock 的普通 {@link Lock} 视图,其中 {@link Lock#lock} 方法映射到 {@link #readLock},其他方法也类似。 返回的 Lock 不支持 {@link Condition}; 方法 {@link Lock#newCondition()} 抛出 {@code UnsupportedOperationException}。
Lock asWriteLock()返回此 StampedLock 的普通 {@link Lock} 视图,其中 {@link Lock#lock} 方法映射到 {@link #writeLock},其他方法也类似。 返回的 Lock 不支持 {@link Condition}; 方法 {@link Lock#newCondition()} 抛出 {@code UnsupportedOperationException}。
ReadWriteLock asReadWriteLock()返回此 StampedLock 的 {@link ReadWriteLock} 视图,其中 {@link ReadWriteLock#readLock()} 方法映射到 {@link #asReadLock()},{@link ReadWriteLock#writeLock()} 映射到 {@link #asWriteLock()}。

数据结构

在这里插入图片描述

写锁的获取与释放

writeLock()

public long writeLock() {
    long s, next;  // 仅在完全解锁的情况下绕过 acquireWrite
    return ((((s = state) & ABITS) == 0L &&
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}

state&ABITS==0成立时,可以使用CAS去更新状态值,如果更新成功表示获取写锁成功。则可以推论writeLock() 方法的返回值为384(110000000)代表获取锁成功。否则执行acquireWrite(false, 0L)进入同步队列。这种判断方式代表写锁不可重入。

state&ABITS==0意味着什么?ABITS代表全部写锁与读锁结合的情况。与其进行位与操作,如果结果为0说明目前没有锁,可以进行CAS操作。

            -------------------  -------------------  -------------------  -------------------
ABITS :     0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 1111 1111

long acquireWrite(boolean interruptible, long deadline)

    private long acquireWrite(boolean interruptible, long deadline) {
        WNode node = null, p;
        // 第一个自旋--正式加入队列之前的获取锁重试
        for (int spins = -1;;) { // spin while enqueuing
            long m, s, ns;
            // 执行与writeLock()请求锁方法,拿到锁直接退出
            if ((m = (s = state) & ABITS) == 0L) {
                if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                    return ns;
            }
            // 自旋标识小于0,尝试更新自旋标识
            else if (spins < 0)
                // 如果当前的标记是写锁,且队列头尾是相同结点即无队列,则用根据CPU线程情况更新自旋次数(可能为0),否则次数直接赋0。
                spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            else if (spins > 0) {
                // 当自旋次数>0时,尝试将减少自旋次数
                if (LockSupport.nextSecondarySeed() >= 0)
                    --spins;
            }
            // 能到这说明spins = 0。
            // 通过尾结点的null判断队列是否存在,不存在就进行初始化
            else if ((p = wtail) == null) { // initialize queue
                // 创建一个写线程的结点,前驱结点为null
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    // 此结点也是尾结点
                    wtail = hd;
            }
            // 到这说明spins = 0 且队列已经初始化过了。
            else if (node == null)
                // 构建结点,以尾结点作为前驱结点--此时尾结点还未指向新结点
                node = new WNode(WMODE, p);
            // 确保结点的前驱结点是尾结点
            else if (node.prev != p)
                node.prev = p;
            // 将当前的结点更新为尾结点,跳出自旋--正式加入队列排队去吧
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }

        // 第二个自旋
        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            // 此时的p应该是node的前驱结点,当p变成首结点时,当前结点开始尝试获取锁啊
            if ((h = whead) == p) {
                if (spins < 0)
                    // 首结点的自旋次数
                    spins = HEAD_SPINS;
                // 自旋次数小于最大的首结点自旋次数,对自旋次数进行累加
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                // 内部自旋-首结点下的自旋
                for (int k = spins;;) { // spin at head
                    long s, ns;
                    // 申请锁
                    if (((s = state) & ABITS) == 0L) {
                        if (U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + WBIT)) {
                            // 申请到锁之后,将当前结点变为首结点
                            whead = node;
                            node.prev = null;
                            return ns;
                        }
                    }
                    // 自旋次数不足 就跳出自旋
                    else if (LockSupport.nextSecondarySeed() >= 0 &&
                             --k <= 0)
                        // 还是没拿到锁
                        break;
                }
            }
            // 当前结点的前驱结点还不是首结点 且首结点不是null
            else if (h != null) { // help release stale waiters
                WNode c; Thread w;
                // //如果h为悲观读锁节点,则唤醒这个结点下所有的读锁结点,
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            // 如果其他线程在上述代码执行期间没有释放锁,就尝试将线程挂起啊--都到这了 还没拿到锁	
            if (whead == h) {
                // 什么情况会导致当前结点的前驱结点不是p--前驱取消不在链表中了
                if ((np = node.prev) != p) {
                    // 如果当前结点的前驱结点不为null,将np赋予p,并结点作为当前p的后继结点,重连链表
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                // 如果p的结点状态还是初始化,将其变为等待
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                // 如果p的结点是已取消,会尝试将p的前驱结点作为当前结点的前驱节点重新建立联系。p就从队列上清除了
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                // 到这说明p是当前结点的前驱结点,且p的结点状态已经是等待中了
                else {
                    long time; // 0 argument to park means no timeout park 的 0 参数表示没有超时
                    // 0代表没有超时限制
                    if (deadline == 0L)
                        time = 0L;
                    // 超时了本结点取消等待
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    //如果前继节点为WAITING,并且再一次抢锁失败就挂起
                    if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                        whead == h && node.prev == p)
                        U.park(false, time);  // emulate LockSupport.park
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    //如果被中断了,就将node设置为cancel
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

nextSecondarySeed()

static final int nextSecondarySeed() {
    int r;
    Thread t = Thread.currentThread();
    if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
        r ^= r << 13;   // xorshift
        r ^= r >>> 17;
        r ^= r << 5;
    }
    else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
        r = 1; // avoid zero
    UNSAFE.putInt(t, SECONDARY, r);
    return r;
}

cancelWaiter(WNode node, WNode group, boolean interrupted)

    private long cancelWaiter(WNode node, WNode group, boolean interrupted) {
        if (node != null && group != null) {
            Thread w;
            node.status = CANCELLED;
            // unsplice cancelled nodes from group 过滤掉当前group下已取消的结点
            for (WNode p = group, q; (q = p.cowait) != null;) {
                // 如果读线程结点的状态是取消
                if (q.status == CANCELLED) {
                    // 将p的WCOWAIT用q的cowait替换
                    U.compareAndSwapObject(p, WCOWAIT, q, q.cowait);
                    p = group; // restart 重置p,此时的group已经经过更新,删除了一个CANCELLED的结点了
                }
                // 直接检查下一个结点的cowait
                else
                    p = q;
            }
            // 以node开始cowait链路中不存在CANCELLED的节点
            if (group == node) {
                // 对整个group中的结点全部唤醒
                for (WNode r = group.cowait; r != null; r = r.cowait) {
                    if ((w = r.thread) != null)
                        U.unpark(w);       // wake up uncancelled co-waiters
                }
                // 当前结点的前驱节点不为空,向前遍历
                for (WNode pred = node.prev; pred != null; ) { // unsplice
                    WNode succ, pp;        // find valid successor
                    // 当前结点的后继为空或者后继结点是取消状态的情况下
                    while ((succ = node.next) == null ||
                           succ.status == CANCELLED) {
                        WNode q = null;    // find successor the slow way
                        // 从队尾向前寻找非CANCELLED结点-重链链表
                        for (WNode t = wtail; t != null && t != node; t = t.prev)
                            if (t.status != CANCELLED)
                                q = t;     // don't link if succ cancelled
                        if (succ == q ||   // ensure accurate successor
                            U.compareAndSwapObject(node, WNEXT,
                                                   succ, succ = q)) {
                            if (succ == null && node == wtail)
                                U.compareAndSwapObject(this, WTAIL, node, pred);
                            break;
                        }
                    }
                    // 逻辑到这说明当前结点的后继结点非null且非取消,在前驱也没变化的情况下直接更新WNEXT
                    if (pred.next == node) // unsplice pred link
                        U.compareAndSwapObject(pred, WNEXT, node, succ);
                    // 尝试唤醒 succ 观察新的 pred
                    if (succ != null && (w = succ.thread) != null) {
                        succ.thread = null;
                        U.unpark(w);       // wake up succ to observe new pred
                    }
                    if (pred.status != CANCELLED || (pp = pred.prev) == null)
                        break;
                    node.prev = pp;        // repeat if new pred wrong/cancelled
                    U.compareAndSwapObject(pp, WNEXT, pred, succ);
                    pred = pp;
                }
            }
        }
        WNode h; // Possibly release first waiter
        while ((h = whead) != null) {
            long s; WNode q; // similar to release() but check eligibility
            if ((q = h.next) == null || q.status == CANCELLED) {
                for (WNode t = wtail; t != null && t != h; t = t.prev)
                    if (t.status <= 0)
                        q = t;
            }
            if (h == whead) {
                if (q != null && h.status == 0 &&
                    ((s = state) & ABITS) != WBIT && // waiter is eligible
                    (s == 0L || q.mode == RMODE))
                    release(h);
                break;
            }
        }
        return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L;
    }

写锁的释放

unlockWrite(long stamp)

public void unlockWrite(long stamp) {
    WNode h;
    // 如果state != stamp,并且 stamp & WABIT == 0 说明被Stamp的写锁标志变为0了,这就属于异常了
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    // state = stamp + WBIT 会把写锁标志的1变为0,并向前进1,也就是版本号会加1
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    //队列不为空,唤醒后继结点
    if ((h = whead) != null && h.status != 0)
        release(h);
}

假设此时stamp只有写锁,通过下表的计算,可以发现,写锁标记位为0 释放了写锁,并且向前进1,这样整个状态变量的值代表的版本号含义也增加了。版本号不停的增加,当超过64位,出现溢出时,此时stamp + WBIT==0L成立,用ORIGIN重置state。

            -------------------  -------------------  -------------------  -------------------
WBIT  :     0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 1000 0000          
stamp :     0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0000 0000  0000 0001 1000 0000

stamp+stamp:0000 0000 0000 0000  0000 0000 0000 0000  0000 0000 0000 0000  0000 0010 0000 0000

release(WNode h)

    private void release(WNode h) {
        //队列不为空
        if (h != null) {
            WNode q; Thread w;
            //尝试将h的waitstatus变为0,为啥变0??
            U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
            //1、h的next节点被取消了
            //2、h的next节点为null,并不代表队列为空,因为是在入队的时候先设置的node.prev = p,最后才设置的p.next = node,所以要从尾到头遍历
            if ((q = h.next) == null || q.status == CANCELLED) {
                for (WNode t = wtail; t != null && t != h; t = t.prev)
                    // 如果结点的状态不是已取消状态的都唤醒
                    if (t.status <= 0)
                        q = t;
            }
            if (q != null && (w = q.thread) != null)
                //唤醒线程,去抢锁锁
                //之前挂起线程是在一个死循环中挂起的,直到超时或者申请成功锁后,才会退出
                U.unpark(w);
        }
    }

乐观读锁的获取与释放

tryOptimisticRead()

    public long tryOptimisticRead() {
        long s;
        // 如果存在写锁 则结果一直为0。s&SBITS即 版本号+读锁的个数
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }

validate(long stamp)

    public boolean validate(long stamp) {
        //使用LoadLoad屏障,禁止将之前的读取成员属性的操作重排序到后面的验证逻辑之后 
        U.loadFence();
        // stamp=0时 代表当时有写锁 stamp & SBITS=0
        // state & SBITS 如果此时有写锁,其值定不为0;当前无锁,之前的写锁释放了,版本号+1,其值也不为0 state & SBITS != stamp & SBITS
        // stamp!=0时,代表当时无写锁 stamp & SBITS就是版本号+锁标志0。
        // 如果此时有写锁,那么 返回false;如果此时无写锁 ,如果不存在写锁的获取与释放版本号不变 返回true 如果存在写锁的获取与释放版本号变化,返回false
        return (stamp & SBITS) == (state & SBITS);
    }

当validate返回false是将乐观读锁升级为悲观读

readLock()

    public long readLock() {
        long s = state, next;  // bypass acquireRead on common uncontended case
        //1. whead == wtail 代表队列为空
        //2. (s & ABITS) < RFULL 代表当前没有写锁,读锁个数没超出范围
        //3. U.compareAndSwapLong(this, STATE, s, next = s + RUNIT) 将读锁的个数+1
        // 如果1 2 3全部满足则返回next 否则再次读锁申请
        return ((whead == wtail && (s & ABITS) < RFULL &&
                 U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
                next : acquireRead(false, 0L));
    }

acquireRead(boolean interruptible, long deadline)

    private long acquireRead(boolean interruptible, long deadline) {
        WNode node = null, p;
        // 第一个无限循环
        for (int spins = -1;;) {
            WNode h;
            // 首尾结点是同一个,队列为空
            if ((h = whead) == (p = wtail)) {
                // 第一个无限循环内部自旋
                for (long m, s, ns;;) {
                    // 尝试获取读锁-
                    if ((m = (s = state) & ABITS) < RFULL ?
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                        return ns;
                    // 如果存在写锁
                    else if (m >= WBIT) {
                        // 自旋次数尝试减少
                        if (spins > 0) {
                            if (LockSupport.nextSecondarySeed() >= 0)
                                --spins;
                        }
                        else {
                            // 自旋次数用完
                            if (spins == 0) {
                                //h为自旋前的whead,p为自旋前的wtail
                                WNode nh = whead, np = wtail;
                                //如果nh = h && np = p,说明在自旋期间根本没有释放锁,或者队列不为空,跳出子for循环
                                if ((nh == h && np == p) || (h = nh) != (p = np))
                                    break;
                            }
                            spins = SPINS;
                        }
                    }
                }
            }
            // 尾结点为空,需要初始化队列
            if (p == null) { // initialize queue
                // 新建一个结点,若成功设置为首结点,那么尾结点此时也是它
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
                // 初始化结点,将结点加在队尾之后
                node = new WNode(RMODE, p);
            //如果队列为空,或者目前的尾结点是个写锁模式
            else if (h == p || p.mode != RMODE) {
                //确认将结点加在尾结点之后
                if (node.prev != p)
                    node.prev = p;
                // 将尾结点变为当前结点
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                    p.next = node;
                    // 跳出第一个自旋
                    break;
                }
            }
            // 逻辑到这 队列不为空且尾结点是个读模式的节点,尝试将node节点加到尾结点的cowait中
            else if (!U.compareAndSwapObject(p, WCOWAIT,
                                             node.cowait = p.cowait, node))
                node.cowait = null;
            else {
                // 插入前尾节点是读锁的情况,通过cowait加入,这时whead不是哨兵节点,而是读锁。当第一个读锁对象争夺锁的时候,会将whead=node的。
                for (;;) {
                    WNode pp, c; Thread w;
                    // 当首结点不为空且它的cowait不为空,依次唤醒它的cowait队列中的线程
                    if ((h = whead) != null && (c = h.cowait) != null &&
                        U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null) // help release
                        U.unpark(w);
                    // 当前结点的前驱节点(旧尾结点)变成首结点,或者当前结点变成首结点,尝试获取锁,当m<WBIT,直到存在写锁
                    if (h == (pp = p.prev) || h == p || pp == null) {
                        long m, s, ns;
                        do {
                            if ((m = (s = state) & ABITS) < RFULL ?
                                U.compareAndSwapLong(this, STATE, s,
                                                     ns = s + RUNIT) :
                                (m < WBIT &&
                                 (ns = tryIncReaderOverflow(s)) != 0L))
                                return ns;
                        } while (m < WBIT);
                    }
                    // 如果之前自旋期间没有锁释放,就开始尝试挂起了
                    if (whead == h && p.prev == pp) {
                        long time;
                        if (pp == null || h == p || p.status > 0) {
                            node = null; // throw away
                            break;
                        }
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L)
                            return cancelWaiter(node, p, false);
                        Thread wt = Thread.currentThread();
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if ((h != pp || (state & ABITS) == WBIT) &&
                            whead == h && p.prev == pp)
                            U.park(false, time);
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        // 出现中断 取消等待
                        if (interruptible && Thread.interrupted())
                            return cancelWaiter(node, p, true);
                    }
                }
            }
        }
        
        // 插入前的尾节点是写锁
        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            // 当前结点是首结点,使用首部自旋
            if ((h = whead) == p) {
                if (spins < 0)
                    //设置首部自旋次数
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    // 累加自旋次数
                    spins <<= 1;
                for (int k = spins;;) { // 首部自旋
                    long m, s, ns;
                    // 1.当前读锁没有超出限制,获取读锁成功
                    // 2.当前读锁超出限制,但是没有写锁,尝试超分读成功
                    if ((m = (s = state) & ABITS) < RFULL ?
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                        WNode c; Thread w;
                        // 将当前结点变为首结点
                        whead = node;
                        node.prev = null;
                        // 唤醒当前结点中cowait的队列中的线程
                        while ((c = node.cowait的队列中的线程) != null) {
                            if (U.compareAndSwapObject(node, WCOWAIT,
                                                       c, c.cowait) &&
                                (w = c.thread) != null)
                                U.unpark(w);
                        }
                        return ns;
                    }
                    // 如果存在写锁,自旋次数用尽,跳出首部自旋
                    else if (m >= WBIT &&
                             LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                        break;
                }
            }
            // 首结点不为空 尝试唤醒首结点中的cowait队列的线程
            else if (h != null) {
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            // 没有将node设置为whead,就该挂起线程了
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time;
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 &&
                        (p != h || (state & ABITS) == WBIT) &&
                        whead == h && node.prev == p)
                        U.park(false, time);
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

tryIncReaderOverflow(long s)

    private long tryIncReaderOverflow(long s) {
        // assert (s & ABITS) >= RFULL;
        // 读锁个数满了,
        if ((s & ABITS) == RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
                // 额外读的个数加1
                ++readerOverflow;
                state = s;
                return s;
            }
        }
        // 线程让步
        else if ((LockSupport.nextSecondarySeed() &
                  OVERFLOW_YIELD_RATE) == 0)
            Thread.yield();
        return 0L;
    }

读锁释放

unlockRead(long stamp)

    public void unlockRead(long stamp) {
        long s, m; WNode h;
        for (;;) {
            //将stamp与state检验,如果不符合就说明出现异常了,抛出异常。
            if (((s = state) & SBITS) != (stamp & SBITS) ||
                (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
                throw new IllegalMonitorStateException();
            // 读锁的个数在合理范围内
            if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                    if (m == RUNIT && (h = whead) != null && h.status != 0)
                        release(h);
                    break;
                }
            }
            // 存在超读的情况 尝试减少超读
            else if (tryDecReaderOverflow(s) != 0L)
                break;
        }
    }

tryDecReaderOverflow(long s)

    private long tryDecReaderOverflow(long s) {
        // assert (s & ABITS) >= RFULL;
        if ((s & ABITS) == RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
                int r; long next;
                if ((r = readerOverflow) > 0) {
                    readerOverflow = r - 1;
                    next = s;
                }
                else
                    next = s - RUNIT;
                 state = next;
                 return next;
            }
        }
        // 线程让步
        else if ((LockSupport.nextSecondarySeed() &
                  OVERFLOW_YIELD_RATE) == 0)
            Thread.yield();
        return 0L;
    }

转换成写锁

    /*
    * 0代表失败
    */
	public long tryConvertToWriteLock(long stamp) {
        //a代表stamp时锁情况
        long a = stamp & ABITS, m, s, next;
        // 锁未发生变化
        while (((s = state) & SBITS) == (stamp & SBITS)) {
            // 条件成立代表没有写锁也没有读锁
            if ((m = s & ABITS) == 0L) {
                // 如果stamp代表有锁,则退出
                if (a != 0L)
                    break;
                // 尝试获取写锁,获取成功这返回
                if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                    return next;
            }
            // 此时已获取写锁
            else if (m == WBIT) {
                // 若stamp时锁不一致则退出
                if (a != m)
                    break;
                return stamp;
            }
            // 拥有读锁 释放读锁 增加写锁
            else if (m == RUNIT && a != 0L) {
                if (U.compareAndSwapLong(this, STATE, s,
                                         next = s - RUNIT + WBIT))
                    return next;
            }
            else
                break;
        }
        return 0L;
    }

转换成读锁

    public long tryConvertToReadLock(long stamp) {
        long a = stamp & ABITS, m, s, next; WNode h;
        // 
        while (((s = state) & SBITS) == (stamp & SBITS)) {
            if ((m = s & ABITS) == 0L) {
                if (a != 0L)
                    break;
                // 读锁未超读,尝试获取读锁
                else if (m < RFULL) {
                    if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                        return next;
                }
                // 尝试超读
                else if ((next = tryIncReaderOverflow(s)) != 0L)
                    return next;
            }
            else if (m == WBIT) {
                if (a != m)
                    break;
                // 在拥有写锁的情况下,在获取读锁
                state = next = s + (WBIT + RUNIT);
                // 释放写锁
                if ((h = whead) != null && h.status != 0)
                    release(h);
                return next;
            }
            // 已经是没有写锁,只有读锁的情况
            else if (a != 0L && a < WBIT)
                return stamp;
            else
                break;
        }
        return 0L;
    }

转换成乐观锁锁

    public long tryConvertToOptimisticRead(long stamp) {
        long a = stamp & ABITS, m, s, next; WNode h;
        // 内存屏障
        U.loadFence();
        // 自旋
        for (;;) {
            // 锁异常 直接退出
            if (((s = state) & SBITS) != (stamp & SBITS))
                break;
            // 此时既没有读锁也没有写锁 与stamp情况一直时则直接成功返回,否则失败退出
            if ((m = s & ABITS) == 0L) {
                if (a != 0L)
                    break;
                return s;
            }
            // 只拥有写锁
            else if (m == WBIT) {
                if (a != m)
                    break;
                // 什么情况下,加上写锁还为0?
                state = next = (s += WBIT) == 0L ? ORIGIN : s;
                // 尝试释放首结点
                if ((h = whead) != null && h.status != 0)
                    release(h);
                return next;
            }
            // 既有写锁又有读锁 就不转换看
            else if (a == 0L || a >= WBIT)
                break;
            // 读多未超读
            else if (m < RFULL) {
                // 唤醒首结点
                if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) {
                    if (m == RUNIT && (h = whead) != null && h.status != 0)
                        release(h);
                    return next & SBITS;
                }
            }
            // 读锁超读
            else if ((next = tryDecReaderOverflow(s)) != 0L)
                return next & SBITS;
        }
        return 0L;
    }

遗留问题

中断问题导致CPU爆满,在保存了电脑文件时候,运行下列代码对25行进行注释与非注释两中情况下运行。观察CPU使用,发现在t2中断的时候CPU的使用率增加了。

import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.StampedLock;

public class StampedLockDemo {
    public static void main(String[] args) throws InterruptedException {

        final StampedLock lock = new StampedLock();
        Thread t1 = new Thread(() -> {
            // 获取写锁
            lock.writeLock();
            // 模拟程序阻塞等待其他资源
            LockSupport.park();
        });
        t1.start();
        // 保证t1获取写锁
        Thread.sleep(100);
        Thread t2 = new Thread(() -> {
            // 阻塞在悲观读锁
            lock.readLock();
        });
        t2.start();
        // 保证t2阻塞在读锁
        Thread.sleep(100);
        // 中断线程t2,会导致线程t2所在CPU飙升
         t2.interrupt(); //不要轻易执行该行
        t2.join();
    }

}

通过死循环+CAS操作的方式来修改状态位,在挂起线程时,是通过unsafe.park的方式,而对于中断的线程,unsafe.park会直接返回,而在StampedLock的无限循环逻辑中,没有处理中断的逻辑,就会导致阻塞在park上的线程中断后,再次进入循环,直到当前死循环满足退出条件,因此整个过程会使cpu暴涨。

解法一:https://github.com/zuai/Hui/blob/master/StampedLock.java

在acquireRead(boolean interruptible, long deadline) 和acquireWrite(boolean interruptible, long deadline)中添加 保存/复原中断状态的机制。

//                    if (interruptible && Thread.interrupted())
//                        return cancelWaiter(node, node, true);
                    if(Thread.interrupted()){
                        if(interruptible)
                            return cancelWaiter(node, node, true);
                        else
                            interrupted = true;
                    }
举报

相关推荐

0 条评论