0
点赞
收藏
分享

微信扫一扫

高薪程序员&面试题精讲系列50之你知道ConcurrentHashMap是怎么进行扩容的吗?

登高且赋 2022-01-04 阅读 66

一. 面试题及剖析

1. 今日面试题

2. 题目剖析

壹哥在前面4篇文章中,给大家介绍了ConcurrentHashMap的通用功能、特点,以及JDK 7、8中ConcurrentHashMap的底层数据结构,还有ConcurrentHashMap中的核心属性等内容,文章链接如下:

高薪程序员&面试题精讲系列45之你熟悉ConcurrentHashMap吗?

高薪程序员&面试题精讲系列46之说说JDK7中ConcurrentHashMap的底层原理,有哪些数据结构

高薪程序员&面试题精讲系列47之说说JDK8中ConcurrentHashMap的底层原理,HashMap与ConcurrentHashMap有什么区别?

高薪程序员&面试题精讲系列48之说说JDK8中ConcurrentHashMap的sizeCtl是什么意思?最大容量、负载因子是多少?

高薪程序员&面试题精讲系列49之说说ConcurrentHashMap#put方法的源码及数据添加流程

从本文开始,壹哥给大家重点分析JDK 8中ConcurrentHashMap#transfer()方法的源码,重点讲解扩容机制,这一块是咱们面试时的高频考点。

二. transfer()扩容方法的实现逻辑【重点】

1. ConcurrentHashMap的扩容机制

我们在面试时,ConcurrentHashMap中的另一个常考点则是关于数组的扩容机制,所以接下来 壹哥 重点分析ConcurrentHashMap是如何进行扩容的。

当ConcurrentHashMap容量不足时,需要对table数组进行扩容,其扩容的基本逻辑其实跟HashMap是很像的,并且ConcurrentHashMap的扩容也是翻倍扩容,扩容后数组容量为原来的 2 倍。但由于它是支持并发扩容的,所以要更复杂一些,因为它支持多线程进行扩容操作时并没有加锁。

这样做的目的不仅仅是为了满足并发的要求,还是希望利用并发处理来减少扩容时所带来的时间影响。因为在扩容时,总是会涉及到从一个“数组”到另一个“数组”的拷贝操作,如果这个操作能够并发进行,那是非常高效的。

2. 扩容机制的实现步骤

ConcurrentHashMap扩容操作的实现步骤可以分为两个部分,这两部分如下:

2.1 单线程中构建nextTable数组

该数组的大体实现过程其实就是一个遍历、复制的过程。

该过程会根据运算得到需要遍历的次数i,然后利用tabAt()方法获得i位置上的元素,并进行如下判断:

遍历过所有的节点以后,就完成了数据的复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。

2.2 多线程中的节点遍历

在transfer()方法的源码中有一个判断,如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点的上锁操作,就完成了多线程的控制。多线程遍历节点时,处理了一个节点,就把对应节点的值设置为forward;另一个线程看到forward后,就会向后遍历。这样交叉操作就完成了复制工作,而且还很好的解决了线程安全的问题。

3. transfer()扩容方法

transfer()扩容方法是一个非常高效且复杂的方法,在阅读具体的 transfer 源码之前,我们先来了解一下什么时候会触发ConcurrentHashMap的扩容操作,有以下两种情况下可能触发扩容操作:

4. transfer()源码

接下来我们看看transfer()方法的源码,看看ConcurrentHashMap是如何实现扩容的。

/**
* 一个过渡的table表  只有在扩容的时候才会使用
*/
private transient volatile Node<K,V>[] nextTable;

 /**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个nextTable对象 它的容量是原来的两倍
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位
        boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //这个while循环体的作用就是在控制i--  通过i--可以依次遍历原hash表中的节点
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                 //如果所有的节点都已经完成复制工作  就把nextTable赋值给table 清空临时对象nextTable
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍  依然相当于现在容量的0.75倍
                    return;
                }
                //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //如果遍历到的节点为空 则放入ForwardingNode指针
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //如果遍历到ForwardingNode节点  说明这个点已经被处理过了 直接跳过  这里是控制并发扩容的核心
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
              //节点上锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //如果fh>=0 证明这是一个Node节点
                        if (fh >= 0) {
                            int runBit = fh & n;
                            //以下的部分在完成的工作是构造两个链表  一个是原链表  另一个是原链表的反序排列
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //在nextTable的i位置上插入一个链表
                            setTabAt(nextTab, i, ln);
                            //在nextTable的i+n的位置上插入另一个链表
                            setTabAt(nextTab, i + n, hn);
                            //在table的i位置上插入forwardNode节点  表示已经处理过该节点
                            setTabAt(tab, i, fwd);
                            //设置advance为true 返回到上面的while循环中 就可以执行i--操作
                            advance = true;
                        }
                        //对TreeBin对象进行处理  与上面的过程类似
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            //构造正序和反序两个链表
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //如果扩容后已经不再需要tree的结构 反向转换为链表结构
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                             //在nextTable的i位置上插入一个链表    
                            setTabAt(nextTab, i, ln);
                            //在nextTable的i+n的位置上插入另一个链表
                            setTabAt(nextTab, i + n, hn);
                             //在table的i位置上插入forwardNode节点  表示已经处理过该节点
                            setTabAt(tab, i, fwd);
                            //设置advance为true 返回到上面的while循环中 就可以执行i--操作
                            advance = true;
                        }
                    }
                }
            }
        }
    }

transfer()方法真的很复杂,相信很多人都没耐心一行行阅读下来,即使阅读了一时之间也搞不懂啥意思,所以 壹哥 给各位把上面的源码分拆解读一下。

5. transfer()源码解读【重点】

5.1 设定步长stride

transfer()方法中首先会设定一个步长stride,这个stride的最小值等于MIN_TRANSFER_STRIDE=16

int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    stride = MIN_TRANSFER_STRIDE; // subdivide range

5.2 对nextTab数组进行初始化

接着判断nextTab是否为空,如果传入的nextTab为空,就会对其进行初始化。外围会保证,第一个发起迁移的线程调用此方法时,nextTab=null,后面的线程再调用时nextTab就不再为null。然后再将nextTab赋值给全局变量nextTable。

if (nextTab == null) {// initiating
   try {
       @SuppressWarnings("unchecked")
       Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
       nextTab = nt;
    } catch (Throwable ex) {      // try to cope with OOME
         sizeCtl = Integer.MAX_VALUE;
         return;
    }
         nextTable = nextTab;
         transferIndex = n;
}

5.3 创建ForwardingNode节点

接下来新建了一个ForwardingNode节点,这个节点的哈希值=MOVED=-1。

int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab

5.4 执行一个死循环

然后会在一个死循环中进行状态的改变,这里可能会存在3种情况。

第1种情况,--i>=bound或者finshing是True:

对i减1以后,判断i是否大于边界的最小值bound。如果小于bound,则说明上次领取的一个区间任务已经完成,需要领取下一个区间任务。通常境况下,第一次进入循环时,i这个判断会无法通过,会执行下面的nextIndex赋值操作。

第2种情况,如果小于等于0,说明没有区间了,则将 i 改成 -1,且推进状态变成 false,就不再推进了,表示扩容已结束,当前线程可以退出了。这个 -1 会在下面的 if 块里判断,从而进入到完成状态。

第3种情况,就是去领取线程的任务区间,下限是bound,上限是i,bound=nextIndex-stride,i=nextIndex。

for (int i = 0, bound = 0;;) {
    Node<K,V> f; int fh;
    while (advance) {
        int nextIndex, nextBound;
        if (--i >= bound || finishing)
            advance = false;
        else if ((nextIndex = transferIndex) <= 0) {
            i = -1;
            advance = false;
        }
        else if (U.compareAndSwapInt
                 (this, TRANSFERINDEX, nextIndex,
                  nextBound = (nextIndex > stride ?
                               nextIndex - stride : 0))) {
            bound = nextBound;
            i = nextIndex - 1;
            advance = false;
        }
}

5.5 判断是否结束当前线程的扩容任务

如果 i 小于0,或者不在 tab数组的下标范围内,就按照上面的判断,领取最后一段区间任务,结束当前线程的扩容任务。

if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    if (finishing) {
      //将新的 nextTab 赋值给 table 属性,完成迁移
        nextTable = null;
        table = nextTab; // 更新 table
      //重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
        sizeCtl = (n << 1) - (n >>> 1);
        return;
    }
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;
        i = n; // recheck before commit
    }
}

5.6 创建fwd节点进行占位

接下来判断,如果旧的tab中,i位置上的值为null,就会用CAS写入一个fwd节点进行占位。

else if ((f = tabAt(tab, i)) == null)
    advance = casTabAt(tab, i, null, fwd);

5.7 加锁进行数据添加

接下来这部分是扩容的关键代码。

else {// 到这里,说明这个位置有实际值了,且不是占位符。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据
            synchronized (f) {
                // 判断 i 下标处的桶节点是否和 f 相同
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;// low, height 高位桶,低位桶
                    // 如果 f 的 hash 值大于 0 。TreeBin 的 hash 是 -2
                    if (fh >= 0) {
                        // 对老长度进行与运算(第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么结果的第n为也为1,否则为0)
                        // 由于 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种结果,一种是 0,一种是1
                        //  如果是结果是0 ,Doug Lea 将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中他。
                        int runBit = fh & n;
                        Node<K,V> lastRun = f; // 尾节点,且和头节点的 hash 值取于不相等
                        // 遍历这个桶
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            // 取于桶中每个节点的 hash 值
                            int b = p.hash & n;
                            // 如果节点的 hash 值和首节点的 hash 值取于结果不同
                            if (b != runBit) {
                                runBit = b; // 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。
                                lastRun = p; // 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环
                            }
                        }
                        if (runBit == 0) {// 如果最后更新的 runBit 是 0 ,设置低位节点
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun; // 如果最后更新的 runBit 是 1, 设置高位节点
                            ln = null;
                        }// 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果)
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            // 如果与运算结果是 0,那么就还在低位
                            if ((ph & n) == 0) // 如果是0 ,那么创建低位节点
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else // 1 则创建高位
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其实这里类似 hashMap 
                        // 设置低位链表放在新链表的 i
                        setTabAt(nextTab, i, ln);
                        // 设置高位链表,在原有长度上加 n
                        setTabAt(nextTab, i + n, hn);
                        // 将旧的链表设置成占位符
                        setTabAt(tab, i, fwd);
                        // 继续向后推进
                        advance = true;

上面的代码挺难理解,我们可以配合下图帮助理解。图中灰色的圆圈代表fh & n = 0,绿色的圆圈代表fh & n ! =0。经过一轮循环以后,LastRun=8,Runbit=0,那么ln=LastRun(8节点),hn=null。后面再经过一轮循环,这样就可以将ln低位链表和hn高位链表都获取到。

5.8 链表转为红黑树

最后一段代码,则是判断是否需要进行红黑树的转移。

else if (f instanceof TreeBin) {
    TreeBin<K,V> t = (TreeBin<K,V>)f;
    TreeNode<K,V> lo = null, loTail = null;
    TreeNode<K,V> hi = null, hiTail = null;
    int lc = 0, hc = 0;
    // 遍历
    for (Node<K,V> e = t.first; e != null; e = e.next) {
        int h = e.hash;
        TreeNode<K,V> p = new TreeNode<K,V>
            (h, e.key, e.val, null, null);
        // 和链表相同的判断,与运算 == 0 的放在低位
        if ((h & n) == 0) {
            if ((p.prev = loTail) == null)
                lo = p;
            else
                loTail.next = p;
            loTail = p;
            ++lc;
        } // 不是 0 的放在高位
        else {
            if ((p.prev = hiTail) == null)
                hi = p;
            else
                hiTail.next = p;
            hiTail = p;
            ++hc;
        }
    }
    // 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
    ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
        (hc != 0) ? new TreeBin<K,V>(lo) : t;
    hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
        (lc != 0) ? new TreeBin<K,V>(hi) : t;
    // 低位树
    setTabAt(nextTab, i, ln);
    // 高位数
    setTabAt(nextTab, i + n, hn);
    // 旧的设置成占位符
    setTabAt(tab, i, fwd);
    // 继续向后推进
    advance = true;
}
}
}

6. transfer()总结【重点】

这个transfer()方法真的太复杂啦,我们很难记得住,所以壹哥给大家做个简单的总结,我们只需记住transfer()中大致执行了以下几个功能即可:

三. 结语

本篇文章,壹哥 重点讲解了JDK 8版本ConcurrentHashMap#transfer()扩容方法的源码以及扩容机制的流程。接下来 壹哥 会再通过另一篇文章,给大家讲解 JDK 8中的ConcurrentHashMap#get()方法的源码,请做好准备哦。

举报

相关推荐

0 条评论