前言
本文适看人群:
- 最近想换工作的老铁;
- 复习或者初学
concurrentHashMap
的兄弟; - 想挖穿
concurrentHashMap
源码的卷王。
以下内容,是本帝亲自反复阅读,debug源码得出的一些经验和结论,铁子们可以放心食用,童叟无欺阿~。
本文涉及知识:
cas
乐观锁synchronized
锁升级volatile
关键字- 进制转化和运算
模拟面试
下面是面试常问的一些ConcurrentHashMap相关的问题。
废话少说,直接开干。
面试官:HashMap哪儿不行了,为什么要有ConcurrentHashMap?
我:据我了解hashMap
线程危险,ConcurrentHashMap
线程安全
hashMap
线程不安全的地方:
jdk1.7
的时候可能会产生环链和数据丢失;jdk1.8
会存在数据覆盖的问题。
jdk1.8
下的hashMap
:
- 覆盖头节点:线程a和线程b同时判断了同一个下标没有节点(为
null
)时,线程a时间片结束,线程b执行,线程b插入该下标一个node节点,接着线程a恢复执行,a就会将b插入的node节点直接覆盖了; - 覆盖
size:put
操作完成后计算map
中节点的个数采用的是++size,会出现线程a和b同时获取到当前的size=10,然后线程a +1 size = 11 ,在到线程b+1 size还是=11。
面试官:你知道concurrentHashMap是怎么保证线程安全的吗?
我:ConcurrentHashMap
的很多变量都用volatile
来修饰,保证多线程下的可见性,以及禁止指令重排序,但是volatile
不保证原子性,所以引入了cas
乐观锁以及synchronized
关键字,来保证原子性。
面试官:它put的过程是怎么样的?
我:先计算key的hash
值,然后有一个for(;;;)
循环作为cas
的自旋操作,for的内部首先cas
对table数组进行初始化,然后通过key的hash值和数组容量计算得出一个下标位置,对下标处的链表头结点加synchronized
锁,然后在进行后续的操作。
面试官:为什么这两个地方要用不一样的锁,为什么这样设计呢?
我:cas
是一个轻量级的锁,就是一个简单的比较并替换的过程,并且该过程指令在cpu级完成,所以它很高效。因为table的初始化只有一次,即便在多线程竞争激烈的情况下,只要有一个线程初始化成功,那么本次的竞争直接结束,就不会出现有线程一直抢不到锁而一直自旋浪费cpu的情况。为什么在头节点加synchronized
呢?因为如果锁住整个表,其他线程都要等待其中一个线程put的所有操作完成之后才能执行put
操作,程序运行的效率会大打则扣。如果加在每个节点上,当遍历链表时,每获取一个节点都要进行一次加锁解锁的操作,并且节点处的竞争并不激烈,这样更浪费性能。所以加在下标的头节点处,不会影响其他线程对其他下标的操作,又能够保证同一个下标下整个链表的线程安全。可以说加锁的粒度刚刚好。
面试官:concurrentHashMap是怎么扩容的?
我:多线程扩容,putVal
方法有两处扩容方法的入口,一个是在给头节点加锁前,判断当前是否有其他线程正在扩容,有的话就去帮助扩容。另一个是在节点添加完成后统计size的方法里面,判断当前容量是否需要扩容,或者去帮助扩容。扩容的时候采用分段扩容的方式,每段大小为16(最小为16),每个线程从后往前按段的大小获取自己要处理的一段数据进行转移。
例子:当前容量为64,有三个线程a,b,c,a第一个开始扩容,a会先创建一个原数组两倍大小的新数组,也就是容量为128的新数组,然后依次获取原数组下标63 ~ 47(63,62,61…这样获取)的元素,转移到新数组。这时b线程在put过程中发现有线程正在扩容,那么就会去帮助扩容,b线程会获取原数组下标47 ~ 31的元素进行转移。转移到新数组的哪个位置呢,它是根据key的hash值,经过简单的计算得到(而不需要rehash,这也是一个优化点,后面会讲),经过计算要么是放到 原数组下标位置,那么是放到 原数组下标 + 原数组容量 的位置。
面试官:concurrentHashMap是怎么记录size的?
我:在putVal
方法里面,put的key value已经被成功的放入map,然后就开始对size进行+1的操作,concurrentHashMap
为了保证线程安全,使用了cas
来对baseCount
进行累加操作,为了防止多线程竞争激烈的情况下,有线程会一直cas
失败一直自旋浪费cpu,又引入了一个
CounterCell[]
对象数组,让竞争失败的线程在这个对象数组里面获取一个对象,然后增加对象里面的一个变量value的值(增加value的值也是使用cas的方式),最终整个map的size值就是baseCount + CounterCell[]数组中每个CounterCell对象value值的总和。这样设计的好处就是使用分段加锁的思想,将竞争分摊到每个CounterCell
对象上,从而提高程序运行的效率。
面试官:concurrentHashMap的变量sizeCtl有什么作用?
我:多线程之间,读取被volatile
修饰的sizeCtl
属性,来判断ConcurrentHashMap当前所处的状态。通过CAS
设置sizeCtl
属性,告知其他线程ConcurrentHashMap
的状态变更。所以sizeCtl是标识ConcurrentHashMap状态的变量。
不同状态,sizeCtl代表的含义也有所不同:
- 未初始化:sizeCtl=0:表示没有指定初始容量。sizeCtl>0:表示初始容量。
- 初始化中:sizeCtl=-1,标记作用,告知其他线程,正在初始化
- 正常状态:sizeCtl=0.75n,扩容阈值
- 扩容中:sizeCtl < 0 : 表示有其他线程正在执行扩容
sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT)+2
表示此时只有一个线程在执行扩容
面试结束后,面试官找到领导说道:“铭哥,今天这小伙子不错,之前来那些人,被我用ConcurrentHashMap两招就拿下了,今天这个跟我战的难分难解,我觉得就他了?”。领导:“小苟阿,忘了告诉你了,李总今早带了个人过来,把这个坑占了,现在不用招人了。” 面试官:“好的。” 面试官内心活动:“哈哈哈哈,太好了,都面了两个月了,今天终于结束了,今晚回去五黑庆祝一下”。
过了一周,我:“wc,不应该啊,这家我面挺好啊,基本都答上来了,什么情况 ❓ ❓ ❓” gg
哈哈哈,大家不要觉得我上面讲的事情是虚构的。我敢保证这类似的剧情肯定在现实中经常发生,艺术源于生活嘛。所以我想说,面试这个东西还是靠缘分的,大家不要因为面了很多家,或者面试感觉很好拿不到offer而气馁,这家不行出门右拐下一家嘛,保持平常心,坚持住,切记不要因为焦虑而随便面上一家就去了,你一定要相信,有更好的选择在后面等着你。
源码解析
我们主要关注put方法的源码。在正式介绍put方法前,我们先康康put方法里面调用到的一些方法。
1.计算hash值:
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
在原hashMap
将hashCode
值右移16位的基础上又 &
了一个HASH_BITS
(这是和HashMap
不同的地方),7fffffff转化为二进制为:
0111 1111 1111 1111 1111 1111 1111 1111
最高位为0,这是为了让 (h ^ (h >>> 16)) & HASH_BITS
的值的最高位总是0,也就是保证最后得到的hash
值总是正数(二进制的正负数由最高位来确定,最高位是0就是正数,最高位是1就是负数),因为ConcurrentHashMap
中定义了一些hash
值为负数的标识,所以正常计算出来的hash
值要与之区分开来,就必须要为正数。
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
2.初始化(initTable方法)
put的时候,判断数组是null或者长度为0,就进行初始化操作
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//循环判断数组是否为空或者长度为0
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl小于0 代表其他线程正在初始化(sizeCtl=-1)
//这里sizeCtl不可能小于-1,因为如果正在扩容就不会进入到这个逻辑里面
//判断并且将sizeCtl赋值给sc
if ((sc = sizeCtl) < 0)
//如果有其他线程在初始化,当前线程就让出cpu
Thread.yield(); // lost initialization race; just spin
//如果没有其他线程在初始化,那么就cas争抢初始化的权利
//也就是对比sizeCtl(内存值)和sc(预期值)是否相等,如果相等就将sizeCtl赋值为-1
//并且返回true,反之返回false
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//这里再次判断数组是否为null或长度为0
//这里防止第一个线程初始化数组完成后,其他线程下一个循环进来重复初始化
if ((tab = table) == null || tab.length == 0) {
//这里判断是使用默认容量还是开发者设定的初始容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//创建数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算扩容阈值(相当于总容量 * 0.75)
sc = n - (n >>> 2);
}
} finally {
//将扩容阈值赋值给sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}
小结:ConcurrentHashMap
的初始化操作,就是将数组初始化出来,如果没有设置初始大小,默认大小为16。初始化使用sizeCtl
作为标识判断当前是否有其他线程正在初始化,使用cas
乐观锁和双重检测机制,在保证线程安全的同时也保证了效率。
put k v(putVal方法)
put
方法内部会调用putVal
方法
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//初始化
tab = initTable();
//如果当前下标=null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//那么就直接cas乐观锁去存放头节点
//这里为什么不用synchronized,我的理解是因为存放头节点的操作简单快速(也就是其他线程自旋的时间短)
//不浪费cpu,而且资源竞争没有那么激烈,因为每个下标的头节点只会设置一次。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果当前头节点的hash值为MOVED=-1,那么就代表
//当前节点已经迁移,而且正在扩容
else if ((fh = f.hash) == MOVED)
//就去帮助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//这里是判断了如果头节点不是空,就加synchronized
//这里加synchronized的原因是当前代码块的操作逻辑复杂
//(这里主要逻辑有对链表的操作,或者对红黑树的操作)
//执行相对较慢,而且hash冲突高一些的时候资源竞争激烈
//这时如果用cas,就会导致可能其他线程自旋很久都拿不到锁
//导致cpu的浪费
//这里只是锁住某一个下标的头节点,不会影响其他下标的操作。
synchronized (f) {
if (tabAt(tab, i) == f) {
//hash值大于等于0 代表当前节点是链表结构
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//否则判断是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//判断节点的长度是否达到8
if (binCount >= TREEIFY_THRESHOLD)
//进行红黑树结构的变换
//当然里面还要再判断数组容量是否达到了64
//没有达到64,就会采用扩容的方式来解决
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//重新计算总的元素个数,也就是size
//binCount,记录了key value所在下标的链表长度或者红黑树的元素个数。
addCount(1L, binCount);
return null;
小结:putVal方法使用一个for (Node<K,V>[] tab = table;;)
无限循环来作为cas的自旋操作。通过hash值&n获取下标后,先判断该下标处是否为null,如果为null,直接放入该下标,break出循环。如果有头结点,就取得头结点的hash值,如果其hash值==MOVED
,说明这个头结点是一个转移节点,就去先帮助扩容。否则就直接给头结点加锁,最终将key value 产生的新Node
对象放入链表或者红黑树。最后将size累加1。
加油,优秀和普通的差别就是能别人所不能,比如继续阅读别人看不下去的代码。☔ ⛅ 🌈
size累加1(addCount方法)
累加1到baseCount
或者CounterCell[]
中某个对象的value
中。
private transient volatile long baseCount;
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
addCount方法解析
private final void addCount(long x, int check) {
ConcurrentHashMap.CounterCell[] as; long b, s;
//判断counterCells是否为空,
//1).不管是否为空,都会先通过cas操作尝试修改baseCount变量,对这个变量进行原子累加操作(注:如果在没有竞争的情况下,仍然采用baseCount来记录元素个数)
//2).如果本次cas记录失败说明存在竞争,就不使用baseCount来累加,而是使用CounterCell[]这个对象数组来记录
//理解:其实就是先cas给baseCount累加,如果累加失败,就表示有竞争,然后转而使用CounterCell[]这种分段加锁的方式
//来提高多线程竞争激烈情况下的处理效率,juc下另一个工具LongAdder的实现方式与之类似。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
ConcurrentHashMap.CounterCell a; long v; int m;
boolean uncontended = true;
//这里有几个判断
//1. 计数表为空则直接调用fullAddCount
//2. 从计数表中随机取出一个数组的位置为空,直接调用fullAddCount
//3. 通过CAS修改CounterCell随机位置的值,如果修改失败说明出现并发情况(这里又用到了一种巧妙的方法),调用fullAndCount
//Random在线程并发的时候会有性能问题以及可能会产生相同的随机数,ThreadLocalRandom.getProbe可以解决这个问题,并且性能要比Random高
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//CounterCell[]初始化默认容量为2,并填入两个value值为0的CounterCell对象
fullAddCount(x, uncontended);
return;
}
/**
* if (check <= 1)
* 执行这个代码的前提是程序进入这个判断if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))
* 进入这个判断代表当前counterCells!=null 或者 cas原子累加失败--线程竞争激烈,竞争这么激烈的情况下,如果当前节点的链表长度还小于等于1,那么证明当前剩余空间还很多
* 本次就直接返回,都不需要去检查是否扩容
*/
if (check <= 1)
//如果链表长度小于等于1,直接就不考虑扩容
return;
s = sumCount();
}
//如果链表长度大于等于0,检查是否需要扩容,或帮助扩容
if (check >= 0) {
ConcurrentHashMap.Node<K,V>[] tab, nt; int n, sc;
//s标识集合大小,如果集合大小大于或等于扩容阈值(默认值的0.75)
//并且table不为空并且table的长度小于最大容量
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
//这5个条件只要有一个条件为true,说明当前线程不能帮助进行此次的扩容,直接跳出循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//这里是帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 本次第一个线程去扩容
// 如果当前没有在扩容,那么rs肯定是一个正数,通过rs<<RESIZE_STAMP_SHIFT 将sc设置为一个负数,+2 表示有一个线程在执行扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
我们可以看一下ConcurrentHashMap
统计size
大小的方法:可以看出size=baseCount + CounterCell[]
中所有对象的value
总和。
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
size累加图解:
小结:最上面先是对size
累加的操作,先执行一次cas
对baseCount
累加,如果累加成功,证明竞争不激烈,直接到下面判断扩容。如果累加失败,证明竞争激烈,就将数值1累加到CounterCell[]
的CounterCell
对象中,通过as[ThreadLocalRandom.getProbe() & m]
来获取一个随机数,并保证了多线程下也不会撞数。尽量让不同的线程获取CounterCell[]
不同的下标,从而减少争抢,提高效率。在检查扩容的逻辑里面需要注意sizeCtl=rs << RESIZE_STAMP_SHIFT) + 2
表示当前只有一个线程扩容,后面每新来一个线程帮助扩容sizeCtl
就+1,减少一个sizeCtl
就-1。
4.扩容机制(transfer方法)
下面介绍concurrentHashMap
的扩容机制,多线程分段扩容,从后往前执行。在原数组转移一个节点,就将原数组的那个节点设置为一个转移节点(ForwardingNode
),转移节点的hash值为-1,也就是MOVED
。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//stride 步长(也就是扩容时,每个线程分配的桶数,最小为16)
//根据cpu核数计算步长
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
//初始化一个原数组两倍大小的新数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//初始化 转移节点 其hash值为MOVED=-1
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//i为老数组当前转移位置的下标,bound为边界,标识当前线程转移到什么位置停止本次的转移(又要重新cas获取转移区间)
//也就是会转移老数组下标[bound,i]区间内的节点,每cas一次transferIndex就会-16
// (这样就保证了后面来的线程不会和前面的线程获取到同一个区间)
//例如:当前i=32(表示当前线程正在转移老数组下标为32的节点),每转移一次i-1(从后往前的一个转移顺序)
//bound=16,代表当前线程从下标为32的节点开始往前转移,直到下标为16,当前线程本次转移结束。
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//当本次[bound,i]区间内的节点转移完毕,又重cas获取新的区间,
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
/**
这里是最后一个线程退出transfer方法的地方
在退出前会,最后一个线程会从老数组的[0,length]区间再执行一遍节点转移
猜测是为了保证所有的节点都转移成功了,如果在区间转移的过程中遇到转移节点
那么会跳过。
*/
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
/**
第一个扩容的线程,进入transfer方法前,会执行 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
后面帮助扩容的线程,进入transfer方法前,会执行 sizeCtl = sizeCtl+1
每一个退出transfer方法的线程,退出之前,会执行 sizeCtl = sizeCtl-1也就是下面的cas代码 U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)
所以直到最后一个线程退出时:
必定有 sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),也就是下面的判断 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
如果不相等,说明不到最后一个线程,直接return出transfer方法
*/
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
//最后一个线程就是从这里将原数组长度n赋值给i,然后又重新遍历检查一遍。
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//判断如果是转移节点就跳过
advance = true; // already processed
else {
//开始迁移节点(f是桶位上的头结点)
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
/** 将原来的一个链表拆成两个
* 一个放在原数组下标位置
* 另一个放在原数组下标+原数组长度的位置
*/
int runBit = fh & n;
Node<K,V> lastRun = f;
//循环查找,找到链表中最后一个和它的上一个结点hash & n 所得值不同的节点
//作为lastRun,其hash & n值为runBit
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//为下面的循环计算赋初值
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//循环组装两个链表(看情况,有可能最后有两个链表,
// 也可能只有一个链表,也可能就只有一个头结点)
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将一个链表放到新数组(位置:原数组下标为)
setTabAt(nextTab, i, ln);
//将另一个链表放到新数组(位置:原数组下标+原数组长度)
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//迁移红黑树
else if (f instanceof ConcurrentHashMap.TreeBin) {
ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
小结:注意在每个线程转移完成后会退出该方法,每退出一个线程sizeCtl
就会-1,直到还剩最后一个线程(程序中通过if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
判断当前transfer
方法里面是否只剩一个线程了),这时最后一个线程会从老数组的末尾从后往前依次遍历整个老数组,检查老数组上所有的节点是否都被转移完成了(检查是否都是转移节点ForwardingNode
),检查完没有遗漏的节点,最后一个线程才会退出transfer
方法。
扩容图解:
补充知识点
ConcurrentHashMap和HashMap初始容量计算方式不同:
初始容量的算法不同:
设置初始容量的计算方式在HashMap
的基础上增加了一个计算,就是将你传进来的数值乘上1.5后再进行hashMap
的初始容量计算,比如你传入32的值,会32乘1.5,进行hashMap
的计算,最后计算出初始容量为64。会将64*0.75=48 这个扩容阈值赋值给sizeCtl
。如果你没有进行容量的赋值,那么初始容量为默认16,这时sizeCtl=0
;
-= =-i
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
//这里的位运算相当于 initialCapacity * 1.5
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
为什么是转移到原数组位置或原数组+原数组长度的位置:
先看看结论:扩容的时候节点转移,要么在原位,要么移动到(原位置+原数组长度)的位置,这个只是取决于,节点的hash
值与老数组长度二进制数据最左边的那个1同位置上的值为0还是1,为0就在原位置,为1就放到(原位置+原数组长度)的位置),具体的算法就是使用节点的hash
值和老数组长度的按位与(ph & n)
:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E8saqamZ-1645975130368)(BA82AC7F7AAF41369F427591A3C18EAA)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9GOshDDj-1645975130368)(D54B1184F2424685BDECDA9007285F0A)]
从上图可以看出,当前节点的hash=56318,老数组长度=256,新数组长度=512,当前节点在老数组的下标为254,通过计算我们可以得出节点会放到新数组的510下标位置。
通过下图我们可以看出通过(hash & oldCap) == 0 ? i : i + oldCap
和(newCap-1)& hash
这两种方式都可以计算出节点在新数组的下标位置,但是为什么扩容的时候使用前者而不使用后者呢?这里涉及到与运算的知识点,1&1=1,0&1=0,0&0=0
,可以看出,如果当前获取的位值为1,那么还需要去判断&上的是0,还是1,如果当前获取的位值为0,那么直接可以得到结果为0,都不需要知道参与&运算的另一个值。所以可以得出结论,前者的性能肯定是更高的(因为它0多阿!)。
debug源码tips
看到这里应付一般面试基本没问题了,接下来属于卷王的内容。开个玩笑哈,其实说实话,看懂了但是不熟悉是不行的,忘的也比较快(你不要觉得自己是张无忌)。最好还是自己debug
一遍。下面的内容分享一下我debug
的经验,让兄弟们可以节约点时间。
首先ConcurrentHashMap
是用于并发编程吧,debug之前要搞点多线程吧?嘿嘿,本帝给你准备好了。
1.代码定位
双击Shift
,搜索ConcurrentHashMap
,点击进入ConcurrentHashMap
类。然后ctrl + f
,在本类全局搜索putVal
,定位到put
代码位置。
分析查看的代码主要就是putVal,addCount,transfer,helpTransfer,fullAddCount
这几个。
2.多线程下打断点
设置断点判断,如果想看除了main
线程以外的其他所有线程:
!Thread.currentThread().getName().contains("main")
如果想看某个指定的线程:
Thread.currentThread().getName().contains("pool-1-thread-2")
3.查看扩容效果
可以在这里设置Thread.currentThread().getName().contains("pool-1-thread-2") && nextn == 256
nextn代表新数组长度。
然后在转移后代码处打断点,查看转移前后对比的效果。
这里断点也设置和上面相同的条件Thread.currentThread().getName().contains("pool-1-thread-2") && nextn == 256
通过按F9
,查看扩容过程中老数组和新数组的数据,不断的变化,来加深对ConcurrentHashMap
扩容机制的理解。
总结
看文章只是对程序学习起辅助作用,很多时候还是需要自己去阅读源码,自己去操作一下,才能将这块的知识掌握得更牢固。源码下面没有秘密,源码阅读多了你就会发现,在实际开发中有很多需求或者设计都可以借鉴源码里面的设计思想,或者一些代码。简单来说就是你赋值粘贴的功力又精进了。
比如多线程下获取不会撞数的随机数:
ThreadLocalRandom.getProbe()
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
获取当前系统的cpu核数,可以用来初始化某些场景下线程池的核心线程数:
/** Number of CPUS, to place bounds on some sizings */
static final int NCPU = Runtime.getRuntime().availableProcessors();
当你的项目里面哪天需要使用cas了,你也可以来参考它源码的使用方式,等等。。。
微信公众号「 袋鼠先生的客栈 」,有问题评论区见。如果你觉得我的分享对你有帮助,或者觉得我有点东西,就支持一下我这个初出茅庐的writer,三连,三连,三连~~。点赞👍 关注❤️ 分享👥