0
点赞
收藏
分享

微信扫一扫

天天用lock,不好奇他到底怎么工作的吗 —从ReentrantLock 到AQS

ReentrantLock

我们经常用的 *ReentrantLock*是干什么的呢 我认为这是一个前台/门面(类似设计模式中的门面模式)根据我们的入参创建一个FairSync OR NonfairSync 。sync 担任锁的lock()和release()。

private final Sync sync;
 
    
 
     public ReentrantLock() {
 
         sync = new NonfairSync();
 
     }
 
  
 
     public ReentrantLock(boolean fair) {
 
         sync = fair ? new FairSync() : new NonfairSync();
 
     }

那有人可能就问了啥是公平锁(FairSync)? 啥是非公平锁(NonfairSync)?

就拿商场试吃举例子,前者就是大家都好好排队,后者是新来的看试吃小样还有,直接拿走不参与排队,那显然后面的人就会饥饿 啊。那非公平锁有什么意义呢。想象一下,当商场人满为患了,你去排到试吃的后面都要挤过来,挤过去。显然你在全局上影响了商场的客流动,如果你直接去 偷袭!(马保国音) 显然在商场全局上来说是最优的。

加锁

AQS入队

因为FairSync 和NonfairSync 差的不是很大, 我们就着重讲NonfairSync

那你说那我缺的这块FairSync谁给我补啊,想要就自己来拿( 指自己看源码) 维吉尔音

//java.util.concurrent.locks.ReentrantLock
 
     static final class NonfairSync extends Sync {
 
         private static final long serialVersionUID = 7316153563782823691L;
 
  
 
         final void lock() {
 
             if (compareAndSetState(0, 1))
 
                 setExclusiveOwnerThread(Thread.currentThread());
 
             else
 
                 acquire(1);
 
         }

可见如果CAS成功线程就直接获得锁了,不成功就走了 acquire() 因为Sync extends AbstractQueuedSynchronizer让我们来看看acquire()

// java.util.concurrent.locks.AbstractQueuedSynchronizer
 
      public final void acquire(int arg) {
 
         if (!tryAcquire(arg) &&
 
             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 
             selfInterrupt();
 
     }

tryAcquire() 获取锁失败进入AQS等待队列

AQS终于是露出鸡脚了acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

AQS(AbstractQueuedSynchronizer)抽象队列同步器,名字是不是很高大上,我们别管

就是商场老大爷、老大妈排队购物(先进先出的双向链表)。

让我们看看node具有的属性

static final class Node {
 
         // 共有锁?
 
         static final Node SHARED = new Node();
 
         // 独占锁?
 
         static final Node EXCLUSIVE = null;
 
  
 
         // 线程被取消
 
         static final int CANCELLED =  1;
 
         // 线程处于激活态
 
         static final int SIGNAL    = -1;
 
         // 线程在等待中
 
         static final int CONDITION = -2;
 
         /**
 
          * waitStatus value to indicate the next acquireShared should
 
          * unconditionally propagate
 
          */
 
         static final int PROPAGATE = -3;

让我们再看看addWaiter() 通过CAS确保成功加入最后一个节点。

private Node addWaiter(Node mode) {
 
         Node node = new Node(Thread.currentThread(), mode);
 
         // Try the fast path of enq; backup to full enq on failure
 
         Node pred = tail;
 
         if (pred != null) {
 
             node.prev = pred;
 
             if (compareAndSetTail(pred, node)) {
 
                 pred.next = node;
 
                 return node;
 
             }
 
         }
 
         enq(node);   //  对AQS进行初始化再加入
 
         return node;
 
     }

enq() 对队列进行初始化,添加一个虚拟节点(避免空指针)

private Node enq(final Node node) {
 
         for (;;) {
 
             Node t = tail;
 
             if (t == null) { // Must initialize
 
                 if (compareAndSetHead(new Node()))
 
                     tail = head;
 
             } else {
 
                 node.prev = t;
 
                 if (compareAndSetTail(t, node)) {
 
                     t.next = node;
 
                     return t;
 
                 }
 
             }
 
         }
 
     }

AQS出队

让我们回到 acquire()

public final void acquire(int arg) {
 
         if (!tryAcquire(arg) &&
 
             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 
             selfInterrupt();
 
     }

买菜大妈也挺急的,要排队就会催前面快点,于是拍拍前面的人,说往前催一下。(少数情况)前面的人也很急,看着时间来不及烧菜了,就自暴自弃,直接离开了,空出了位置。

final boolean acquireQueued(final Node node, int arg) {
 
         boolean failed = true;
 
         try {
 
             boolean interrupted = false;
 
             for (;;) {
 
                 final Node p = node.predecessor();
 
                 if (p == head && tryAcquire(arg)) {
 
                     setHead(node);
 
                     p.next = null; // help GC
 
                     failed = false;
 
                     return interrupted;
 
                 }
 
                 if (shouldParkAfterFailedAcquire(p, node) &&
 
                     parkAndCheckInterrupt())
 
                     interrupted = true;
 
             }
 
             
 
         // 外部中断,或线程取消等待
 
         } finally {
 
             if (failed)
 
                 cancelAcquire(node);
 
         }
 
     }

后面的人看到前面有空位,就往前走再催前面的人。看到前面的人已经在催前面的人,他就不催了,催玩之后自己就能待机了(干着急也没用)。

为什么会 看到前面的人已经在催前面的人 可能有两个节点被同时加入

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 
         int ws = pred.waitStatus;
 
         if (ws == Node.SIGNAL)// 前面的人已经在问了
 
             return true;
 
         if (ws > 0) {        // 取消节点,空出位置,往前挪
 
             do {
 
                 node.prev = pred = pred.prev;
 
             } while (pred.waitStatus > 0);
 
             pred.next = node;
 
         } else {
 
             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 
         }
 
         return false;
 
     }

解锁

我们来看看锁的释放队列队列为空则调用unparkSuccessor(h) ,为什么 waitState以等于0做标记,且看下文

public final boolean release(int arg) {
 
         if (tryRelease(arg)) {
 
             Node h = head;
 
             if (h != null && h.waitStatus != 0) // 检查AQS是否初始化,或队列是否为空
 
                 unparkSuccessor(h);
 
             return true;
 
         }
 
         return false;
 
     }

waitState等于0可简单看做,已经完成了他作为解锁信号的职责,同时这和 -1是不一样的,

-1 是未知的往前催(不知道前面好没好),0是肯定的说前面有一个空位,并且是head指针自发的,不会传递。

private void unparkSuccessor(Node node) {
 
     int ws = node.waitStatus;
 
     if (ws < 0)
 
         compareAndSetWaitStatus(node, ws, 0); // 重置 waitStatus为 0
 
     Node s = node.next;
 
     if (s == null || s.waitStatus > 0) {
 
         s = null;
 
         for (Node t = tail; t != null && t != node; t = t.prev) // 如果你观察到了这段的奇怪之处,我也没办法解释,看了文章也看到不是很明白,就不误导人了。相关内容在 java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire 
 
             if (t.waitStatus <= 0)
 
                 s = t;
 
     }
 
     if (s != null)
 
         LockSupport.unpark(s.thread); // 唤醒下一个线程
 
 }

队列被 unpark() 唤醒,队伍可以向前移动了

如果觉得有帮到你

点个赞再走呗baby 🥰🥰🥰

举报

相关推荐

0 条评论