0
点赞
收藏
分享

微信扫一扫

【战、面试官】想春招比面试官薪资还高,看这篇ConcurrentHashMap就对了

前言

本文适看人群:

  • 最近想换工作的老铁;
  • 复习或者初学concurrentHashMap的兄弟;
  • 想挖穿concurrentHashMap源码的卷王。

在这里插入图片描述

以下内容,是本帝亲自反复阅读,debug源码得出的一些经验和结论,铁子们可以放心食用,童叟无欺阿~。

本文涉及知识:

  • cas乐观锁
  • synchronized锁升级
  • volatile关键字
  • 进制转化和运算

模拟面试

下面是面试常问的一些ConcurrentHashMap相关的问题。

废话少说,直接开干。

面试官:HashMap哪儿不行了,为什么要有ConcurrentHashMap?

:据我了解hashMap线程危险,ConcurrentHashMap线程安全

hashMap线程不安全的地方:

  1. jdk1.7的时候可能会产生环链和数据丢失;
  2. jdk1.8会存在数据覆盖的问题。

jdk1.8下的hashMap

  1. 覆盖头节点:线程a和线程b同时判断了同一个下标没有节点(为null)时,线程a时间片结束,线程b执行,线程b插入该下标一个node节点,接着线程a恢复执行,a就会将b插入的node节点直接覆盖了;
  2. 覆盖size:put操作完成后计算map中节点的个数采用的是++size,会出现线程a和b同时获取到当前的size=10,然后线程a +1 size = 11 ,在到线程b+1 size还是=11。

面试官:你知道concurrentHashMap是怎么保证线程安全的吗?

ConcurrentHashMap的很多变量都用volatile来修饰,保证多线程下的可见性,以及禁止指令重排序,但是volatile不保证原子性,所以引入了cas乐观锁以及synchronized关键字,来保证原子性。

面试官:它put的过程是怎么样的?

:先计算key的hash值,然后有一个for(;;;)循环作为cas的自旋操作,for的内部首先cas对table数组进行初始化,然后通过key的hash值和数组容量计算得出一个下标位置,对下标处的链表头结点加synchronized锁,然后在进行后续的操作。

面试官:为什么这两个地方要用不一样的锁,为什么这样设计呢?

cas是一个轻量级的锁,就是一个简单的比较并替换的过程,并且该过程指令在cpu级完成,所以它很高效。因为table的初始化只有一次,即便在多线程竞争激烈的情况下,只要有一个线程初始化成功,那么本次的竞争直接结束,就不会出现有线程一直抢不到锁而一直自旋浪费cpu的情况。为什么在头节点加synchronized呢?因为如果锁住整个表,其他线程都要等待其中一个线程put的所有操作完成之后才能执行put操作,程序运行的效率会大打则扣。如果加在每个节点上,当遍历链表时,每获取一个节点都要进行一次加锁解锁的操作,并且节点处的竞争并不激烈,这样更浪费性能。所以加在下标的头节点处,不会影响其他线程对其他下标的操作,又能够保证同一个下标下整个链表的线程安全。可以说加锁的粒度刚刚好。

面试官:concurrentHashMap是怎么扩容的?

:多线程扩容,putVal方法有两处扩容方法的入口,一个是在给头节点加锁前,判断当前是否有其他线程正在扩容,有的话就去帮助扩容。另一个是在节点添加完成后统计size的方法里面,判断当前容量是否需要扩容,或者去帮助扩容。扩容的时候采用分段扩容的方式,每段大小为16(最小为16),每个线程从后往前按段的大小获取自己要处理的一段数据进行转移。

例子:当前容量为64,有三个线程a,b,c,a第一个开始扩容,a会先创建一个原数组两倍大小的新数组,也就是容量为128的新数组,然后依次获取原数组下标63 ~ 47(63,62,61…这样获取)的元素,转移到新数组。这时b线程在put过程中发现有线程正在扩容,那么就会去帮助扩容,b线程会获取原数组下标47 ~ 31的元素进行转移。转移到新数组的哪个位置呢,它是根据key的hash值,经过简单的计算得到(而不需要rehash,这也是一个优化点,后面会讲),经过计算要么是放到 原数组下标位置,那么是放到 原数组下标 + 原数组容量 的位置。

面试官:concurrentHashMap是怎么记录size的?

:在putVal方法里面,put的key value已经被成功的放入map,然后就开始对size进行+1的操作,concurrentHashMap为了保证线程安全,使用了cas来对baseCount进行累加操作,为了防止多线程竞争激烈的情况下,有线程会一直cas失败一直自旋浪费cpu,又引入了一个
CounterCell[]对象数组,让竞争失败的线程在这个对象数组里面获取一个对象,然后增加对象里面的一个变量value的值(增加value的值也是使用cas的方式),最终整个map的size值就是baseCount + CounterCell[]数组中每个CounterCell对象value值的总和。这样设计的好处就是使用分段加锁的思想,将竞争分摊到每个CounterCell对象上,从而提高程序运行的效率。

面试官:concurrentHashMap的变量sizeCtl有什么作用?

:多线程之间,读取被volatile修饰的sizeCtl属性,来判断ConcurrentHashMap当前所处的状态。通过CAS设置sizeCtl属性,告知其他线程ConcurrentHashMap的状态变更。所以sizeCtl是标识ConcurrentHashMap状态的变量

不同状态,sizeCtl代表的含义也有所不同:

  • 未初始化:sizeCtl=0:表示没有指定初始容量。sizeCtl>0:表示初始容量。
  • 初始化中:sizeCtl=-1,标记作用,告知其他线程,正在初始化
  • 正常状态:sizeCtl=0.75n,扩容阈值
  • 扩容中:sizeCtl < 0 : 表示有其他线程正在执行扩容
  • sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT)+2 表示此时只有一个线程在执行扩容

面试结束后,面试官找到领导说道:“铭哥,今天这小伙子不错,之前来那些人,被我用ConcurrentHashMap两招就拿下了,今天这个跟我战的难分难解,我觉得就他了?”。领导:“小苟阿,忘了告诉你了,李总今早带了个人过来,把这个坑占了,现在不用招人了。” 面试官:“好的。” 面试官内心活动:“哈哈哈哈,太好了,都面了两个月了,今天终于结束了,今晚回去五黑庆祝一下”。

过了一周,我:“wc,不应该啊,这家我面挺好啊,基本都答上来了,什么情况 ❓ ❓ ❓” gg

哈哈哈,大家不要觉得我上面讲的事情是虚构的。我敢保证这类似的剧情肯定在现实中经常发生,艺术源于生活嘛。所以我想说,面试这个东西还是靠缘分的,大家不要因为面了很多家,或者面试感觉很好拿不到offer而气馁,这家不行出门右拐下一家嘛,保持平常心,坚持住,切记不要因为焦虑而随便面上一家就去了,你一定要相信,有更好的选择在后面等着你。

源码解析

我们主要关注put方法的源码。在正式介绍put方法前,我们先康康put方法里面调用到的一些方法。

1.计算hash值:

static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

在原hashMaphashCode值右移16位的基础上又 &了一个HASH_BITS(这是和HashMap不同的地方),7fffffff转化为二进制为:

0111 1111 1111 1111 1111 1111 1111 1111

最高位为0,这是为了让 (h ^ (h >>> 16)) & HASH_BITS 的值的最高位总是0,也就是保证最后得到的hash值总是正数(二进制的正负数由最高位来确定,最高位是0就是正数,最高位是1就是负数),因为ConcurrentHashMap中定义了一些hash值为负数的标识,所以正常计算出来的hash值要与之区分开来,就必须要为正数。

static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations

2.初始化(initTable方法)

put的时候,判断数组是null或者长度为0,就进行初始化操作

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //循环判断数组是否为空或者长度为0
    while ((tab = table) == null || tab.length == 0) {
        //如果sizeCtl小于0 代表其他线程正在初始化(sizeCtl=-1)
        //这里sizeCtl不可能小于-1,因为如果正在扩容就不会进入到这个逻辑里面
        //判断并且将sizeCtl赋值给sc
        if ((sc = sizeCtl) < 0)
            //如果有其他线程在初始化,当前线程就让出cpu
            Thread.yield(); // lost initialization race; just spin
        //如果没有其他线程在初始化,那么就cas争抢初始化的权利
        //也就是对比sizeCtl(内存值)和sc(预期值)是否相等,如果相等就将sizeCtl赋值为-1
        //并且返回true,反之返回false
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                //这里再次判断数组是否为null或长度为0
                //这里防止第一个线程初始化数组完成后,其他线程下一个循环进来重复初始化
                if ((tab = table) == null || tab.length == 0) {
                    //这里判断是使用默认容量还是开发者设定的初始容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //创建数组
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    //计算扩容阈值(相当于总容量 * 0.75)
                    sc = n - (n >>> 2);
                }
            } finally {
                //将扩容阈值赋值给sizeCtl
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

小结ConcurrentHashMap的初始化操作,就是将数组初始化出来,如果没有设置初始大小,默认大小为16。初始化使用sizeCtl作为标识判断当前是否有其他线程正在初始化,使用cas乐观锁和双重检测机制,在保证线程安全的同时也保证了效率。

put k v(putVal方法)

put方法内部会调用putVal方法

public V put(K key, V value) {
        return putVal(key, value, false);
    }

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());

    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            //初始化
            tab = initTable();
        //如果当前下标=null
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //那么就直接cas乐观锁去存放头节点
            //这里为什么不用synchronized,我的理解是因为存放头节点的操作简单快速(也就是其他线程自旋的时间短)
            //不浪费cpu,而且资源竞争没有那么激烈,因为每个下标的头节点只会设置一次。
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //如果当前头节点的hash值为MOVED=-1,那么就代表
        //当前节点已经迁移,而且正在扩容
        else if ((fh = f.hash) == MOVED)
            //就去帮助扩容
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //这里是判断了如果头节点不是空,就加synchronized
            //这里加synchronized的原因是当前代码块的操作逻辑复杂
            //(这里主要逻辑有对链表的操作,或者对红黑树的操作)
            //执行相对较慢,而且hash冲突高一些的时候资源竞争激烈
            //这时如果用cas,就会导致可能其他线程自旋很久都拿不到锁
            //导致cpu的浪费
            //这里只是锁住某一个下标的头节点,不会影响其他下标的操作。
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //hash值大于等于0 代表当前节点是链表结构
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //否则判断是红黑树
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //判断节点的长度是否达到8
                if (binCount >= TREEIFY_THRESHOLD)
                    //进行红黑树结构的变换
                    //当然里面还要再判断数组容量是否达到了64
                    //没有达到64,就会采用扩容的方式来解决
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //重新计算总的元素个数,也就是size
    //binCount,记录了key value所在下标的链表长度或者红黑树的元素个数。
    addCount(1L, binCount);
    return null;

小结:putVal方法使用一个for (Node<K,V>[] tab = table;;)无限循环来作为cas的自旋操作。通过hash值&n获取下标后,先判断该下标处是否为null,如果为null,直接放入该下标,break出循环。如果有头结点,就取得头结点的hash值,如果其hash值==MOVED,说明这个头结点是一个转移节点,就去先帮助扩容。否则就直接给头结点加锁,最终将key value 产生的新Node对象放入链表或者红黑树。最后将size累加1。

加油,优秀和普通的差别就是能别人所不能,比如继续阅读别人看不下去的代码。☔ ⛅ 🌈

size累加1(addCount方法)

累加1到baseCount或者CounterCell[]中某个对象的value中。

private transient volatile long baseCount;

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

addCount方法解析

private final void addCount(long x, int check) {
        ConcurrentHashMap.CounterCell[] as; long b, s;
        //判断counterCells是否为空,
        //1).不管是否为空,都会先通过cas操作尝试修改baseCount变量,对这个变量进行原子累加操作(注:如果在没有竞争的情况下,仍然采用baseCount来记录元素个数)
        //2).如果本次cas记录失败说明存在竞争,就不使用baseCount来累加,而是使用CounterCell[]这个对象数组来记录
        //理解:其实就是先cas给baseCount累加,如果累加失败,就表示有竞争,然后转而使用CounterCell[]这种分段加锁的方式
        //来提高多线程竞争激烈情况下的处理效率,juc下另一个工具LongAdder的实现方式与之类似。
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            ConcurrentHashMap.CounterCell a; long v; int m;
            boolean uncontended = true;
            //这里有几个判断
            //1. 计数表为空则直接调用fullAddCount
            //2. 从计数表中随机取出一个数组的位置为空,直接调用fullAddCount
            //3. 通过CAS修改CounterCell随机位置的值,如果修改失败说明出现并发情况(这里又用到了一种巧妙的方法),调用fullAndCount
            //Random在线程并发的时候会有性能问题以及可能会产生相同的随机数,ThreadLocalRandom.getProbe可以解决这个问题,并且性能要比Random高
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //CounterCell[]初始化默认容量为2,并填入两个value值为0的CounterCell对象
                fullAddCount(x, uncontended);
                return;
            }
            /**
             * if (check <= 1)
             * 执行这个代码的前提是程序进入这个判断if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))
             * 进入这个判断代表当前counterCells!=null 或者 cas原子累加失败--线程竞争激烈,竞争这么激烈的情况下,如果当前节点的链表长度还小于等于1,那么证明当前剩余空间还很多
             * 本次就直接返回,都不需要去检查是否扩容
             */
            if (check <= 1)
                //如果链表长度小于等于1,直接就不考虑扩容
                return;
            s = sumCount();
        }
        //如果链表长度大于等于0,检查是否需要扩容,或帮助扩容
        if (check >= 0) {
            ConcurrentHashMap.Node<K,V>[] tab, nt; int n, sc;
            //s标识集合大小,如果集合大小大于或等于扩容阈值(默认值的0.75)
            //并且table不为空并且table的长度小于最大容量
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    //这5个条件只要有一个条件为true,说明当前线程不能帮助进行此次的扩容,直接跳出循环
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    //这里是帮助扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // 本次第一个线程去扩容
                // 如果当前没有在扩容,那么rs肯定是一个正数,通过rs<<RESIZE_STAMP_SHIFT 将sc设置为一个负数,+2 表示有一个线程在执行扩容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

我们可以看一下ConcurrentHashMap统计size大小的方法:可以看出size=baseCount + CounterCell[]中所有对象的value总和。

final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

size累加图解

在这里插入图片描述

小结:最上面先是对size累加的操作,先执行一次casbaseCount累加,如果累加成功,证明竞争不激烈,直接到下面判断扩容。如果累加失败,证明竞争激烈,就将数值1累加到CounterCell[]CounterCell对象中,通过as[ThreadLocalRandom.getProbe() & m]来获取一个随机数,并保证了多线程下也不会撞数。尽量让不同的线程获取CounterCell[]不同的下标,从而减少争抢,提高效率。在检查扩容的逻辑里面需要注意sizeCtl=rs << RESIZE_STAMP_SHIFT) + 2表示当前只有一个线程扩容,后面每新来一个线程帮助扩容sizeCtl就+1,减少一个sizeCtl就-1。

4.扩容机制(transfer方法)

下面介绍concurrentHashMap的扩容机制,多线程分段扩容,从后往前执行。在原数组转移一个节点,就将原数组的那个节点设置为一个转移节点(ForwardingNode),转移节点的hash值为-1,也就是MOVED

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //stride 步长(也就是扩容时,每个线程分配的桶数,最小为16)
        //根据cpu核数计算步长
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                //初始化一个原数组两倍大小的新数组
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //初始化 转移节点 其hash值为MOVED=-1
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        //i为老数组当前转移位置的下标,bound为边界,标识当前线程转移到什么位置停止本次的转移(又要重新cas获取转移区间)
        //也就是会转移老数组下标[bound,i]区间内的节点,每cas一次transferIndex就会-16
        // (这样就保证了后面来的线程不会和前面的线程获取到同一个区间)
        //例如:当前i=32(表示当前线程正在转移老数组下标为32的节点),每转移一次i-1(从后往前的一个转移顺序)
        //bound=16,代表当前线程从下标为32的节点开始往前转移,直到下标为16,当前线程本次转移结束。
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //当本次[bound,i]区间内的节点转移完毕,又重cas获取新的区间,
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                /**
                 这里是最后一个线程退出transfer方法的地方
                 在退出前会,最后一个线程会从老数组的[0,length]区间再执行一遍节点转移
                 猜测是为了保证所有的节点都转移成功了,如果在区间转移的过程中遇到转移节点
                 那么会跳过。
                 */
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                /**
                 第一个扩容的线程,进入transfer方法前,会执行 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
                 后面帮助扩容的线程,进入transfer方法前,会执行 sizeCtl = sizeCtl+1
                 每一个退出transfer方法的线程,退出之前,会执行 sizeCtl = sizeCtl-1也就是下面的cas代码 U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)
                 所以直到最后一个线程退出时:
                 必定有 sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),也就是下面的判断 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
                 如果不相等,说明不到最后一个线程,直接return出transfer方法
                 */
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    //最后一个线程就是从这里将原数组长度n赋值给i,然后又重新遍历检查一遍。
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                //判断如果是转移节点就跳过
                advance = true; // already processed
            else {
                //开始迁移节点(f是桶位上的头结点)
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            /** 将原来的一个链表拆成两个
                             * 一个放在原数组下标位置
                             * 另一个放在原数组下标+原数组长度的位置
                             */
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //循环查找,找到链表中最后一个和它的上一个结点hash & n 所得值不同的节点
                            //作为lastRun,其hash & n值为runBit
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            //为下面的循环计算赋初值
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //循环组装两个链表(看情况,有可能最后有两个链表,
                            // 也可能只有一个链表,也可能就只有一个头结点)
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //将一个链表放到新数组(位置:原数组下标为)
                            setTabAt(nextTab, i, ln);
                            //将另一个链表放到新数组(位置:原数组下标+原数组长度)
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //迁移红黑树
                        else if (f instanceof ConcurrentHashMap.TreeBin) {
                            ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

小结:注意在每个线程转移完成后会退出该方法,每退出一个线程sizeCtl就会-1,直到还剩最后一个线程(程序中通过if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)判断当前transfer方法里面是否只剩一个线程了),这时最后一个线程会从老数组的末尾从后往前依次遍历整个老数组,检查老数组上所有的节点是否都被转移完成了(检查是否都是转移节点ForwardingNode),检查完没有遗漏的节点,最后一个线程才会退出transfer方法。

扩容图解

在这里插入图片描述
在这里插入图片描述

补充知识点

ConcurrentHashMap和HashMap初始容量计算方式不同

初始容量的算法不同:

设置初始容量的计算方式在HashMap的基础上增加了一个计算,就是将你传进来的数值乘上1.5后再进行hashMap的初始容量计算,比如你传入32的值,会32乘1.5,进行hashMap的计算,最后计算出初始容量为64。会将64*0.75=48 这个扩容阈值赋值给sizeCtl。如果你没有进行容量的赋值,那么初始容量为默认16,这时sizeCtl=0
-= =-i

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               //这里的位运算相当于 initialCapacity * 1.5
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

为什么是转移到原数组位置或原数组+原数组长度的位置

先看看结论:扩容的时候节点转移,要么在原位,要么移动到(原位置+原数组长度)的位置,这个只是取决于,节点的hash值与老数组长度二进制数据最左边的那个1同位置上的值为0还是1,为0就在原位置,为1就放到(原位置+原数组长度)的位置),具体的算法就是使用节点的hash值和老数组长度的按位与(ph & n)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E8saqamZ-1645975130368)(BA82AC7F7AAF41369F427591A3C18EAA)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9GOshDDj-1645975130368)(D54B1184F2424685BDECDA9007285F0A)]

从上图可以看出,当前节点的hash=56318,老数组长度=256,新数组长度=512,当前节点在老数组的下标为254,通过计算我们可以得出节点会放到新数组的510下标位置。

通过下图我们可以看出通过(hash & oldCap) == 0 ? i : i + oldCap(newCap-1)& hash 这两种方式都可以计算出节点在新数组的下标位置,但是为什么扩容的时候使用前者而不使用后者呢?这里涉及到与运算的知识点,1&1=1,0&1=0,0&0=0,可以看出,如果当前获取的位值为1,那么还需要去判断&上的是0,还是1,如果当前获取的位值为0,那么直接可以得到结果为0,都不需要知道参与&运算的另一个值。所以可以得出结论,前者的性能肯定是更高的(因为它0多阿!)。

在这里插入图片描述
在这里插入图片描述

debug源码tips

看到这里应付一般面试基本没问题了,接下来属于卷王的内容。开个玩笑哈,其实说实话,看懂了但是不熟悉是不行的,忘的也比较快(你不要觉得自己是张无忌)。最好还是自己debug一遍。下面的内容分享一下我debug的经验,让兄弟们可以节约点时间。

首先ConcurrentHashMap是用于并发编程吧,debug之前要搞点多线程吧?嘿嘿,本帝给你准备好了。

1.代码定位

双击Shift,搜索ConcurrentHashMap,点击进入ConcurrentHashMap类。然后ctrl + f,在本类全局搜索putVal,定位到put代码位置。
在这里插入图片描述

分析查看的代码主要就是putVal,addCount,transfer,helpTransfer,fullAddCount这几个。

2.多线程下打断点

设置断点判断,如果想看除了main线程以外的其他所有线程:
!Thread.currentThread().getName().contains("main")

如果想看某个指定的线程:
Thread.currentThread().getName().contains("pool-1-thread-2")

在这里插入图片描述

3.查看扩容效果

可以在这里设置Thread.currentThread().getName().contains("pool-1-thread-2") && nextn == 256 nextn代表新数组长度。

在这里插入图片描述

然后在转移后代码处打断点,查看转移前后对比的效果。
这里断点也设置和上面相同的条件Thread.currentThread().getName().contains("pool-1-thread-2") && nextn == 256
在这里插入图片描述

通过按F9,查看扩容过程中老数组和新数组的数据,不断的变化,来加深对ConcurrentHashMap扩容机制的理解。
在这里插入图片描述
在这里插入图片描述

总结

看文章只是对程序学习起辅助作用,很多时候还是需要自己去阅读源码,自己去操作一下,才能将这块的知识掌握得更牢固。源码下面没有秘密,源码阅读多了你就会发现,在实际开发中有很多需求或者设计都可以借鉴源码里面的设计思想,或者一些代码。简单来说就是你赋值粘贴的功力又精进了。

比如多线程下获取不会撞数的随机数:
ThreadLocalRandom.getProbe()

        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }

获取当前系统的cpu核数,可以用来初始化某些场景下线程池的核心线程数:

/** Number of CPUS, to place bounds on some sizings */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

当你的项目里面哪天需要使用cas了,你也可以来参考它源码的使用方式,等等。。。

在这里插入图片描述

微信公众号「 袋鼠先生的客栈 」,有问题评论区见。如果你觉得我的分享对你有帮助,或者觉得我有点东西,就支持一下我这个初出茅庐的writer,三连,三连,三连~~。点赞👍 关注❤️ 分享👥

举报

相关推荐

0 条评论