一. 面试题及剖析
1. 今日面试题
2. 题目剖析
壹哥在前面4篇文章中,给大家介绍了ConcurrentHashMap的通用功能、特点,以及JDK 7、8中ConcurrentHashMap的底层数据结构,还有ConcurrentHashMap中的核心属性等内容,文章链接如下:
高薪程序员&面试题精讲系列45之你熟悉ConcurrentHashMap吗?
高薪程序员&面试题精讲系列46之说说JDK7中ConcurrentHashMap的底层原理,有哪些数据结构
高薪程序员&面试题精讲系列47之说说JDK8中ConcurrentHashMap的底层原理,HashMap与ConcurrentHashMap有什么区别?
高薪程序员&面试题精讲系列48之说说JDK8中ConcurrentHashMap的sizeCtl是什么意思?最大容量、负载因子是多少?
高薪程序员&面试题精讲系列49之说说ConcurrentHashMap#put方法的源码及数据添加流程
从本文开始,壹哥给大家重点分析JDK 8中ConcurrentHashMap#transfer()方法的源码,重点讲解扩容机制,这一块是咱们面试时的高频考点。
二. transfer()扩容方法的实现逻辑【重点】
1. ConcurrentHashMap的扩容机制
我们在面试时,ConcurrentHashMap中的另一个常考点则是关于数组的扩容机制,所以接下来 壹哥 重点分析ConcurrentHashMap是如何进行扩容的。
当ConcurrentHashMap容量不足时,需要对table数组进行扩容,其扩容的基本逻辑其实跟HashMap是很像的,并且ConcurrentHashMap的扩容也是翻倍扩容,扩容后数组容量为原来的 2 倍。但由于它是支持并发扩容的,所以要更复杂一些,因为它支持多线程进行扩容操作时并没有加锁。
这样做的目的不仅仅是为了满足并发的要求,还是希望利用并发处理来减少扩容时所带来的时间影响。因为在扩容时,总是会涉及到从一个“数组”到另一个“数组”的拷贝操作,如果这个操作能够并发进行,那是非常高效的。
2. 扩容机制的实现步骤
ConcurrentHashMap扩容操作的实现步骤可以分为两个部分,这两部分如下:
2.1 单线程中构建nextTable数组
该数组的大体实现过程其实就是一个遍历、复制的过程。
该过程会根据运算得到需要遍历的次数i,然后利用tabAt()方法获得i位置上的元素,并进行如下判断:
遍历过所有的节点以后,就完成了数据的复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。
2.2 多线程中的节点遍历
在transfer()方法的源码中有一个判断,如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点的上锁操作,就完成了多线程的控制。多线程遍历节点时,处理了一个节点,就把对应节点的值设置为forward;另一个线程看到forward后,就会向后遍历。这样交叉操作就完成了复制工作,而且还很好的解决了线程安全的问题。
3. transfer()扩容方法
transfer()扩容方法是一个非常高效且复杂的方法,在阅读具体的 transfer 源码之前,我们先来了解一下什么时候会触发ConcurrentHashMap的扩容操作,有以下两种情况下可能触发扩容操作:
4. transfer()源码
接下来我们看看transfer()方法的源码,看看ConcurrentHashMap是如何实现扩容的。
/**
* 一个过渡的table表 只有在扩容的时候才会使用
*/
private transient volatile Node<K,V>[] nextTable;
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个nextTable对象 它的容量是原来的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位
boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//这个while循环体的作用就是在控制i-- 通过i--可以依次遍历原hash表中的节点
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
return;
}
//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果遍历到的节点为空 则放入ForwardingNode指针
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//节点上锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//如果fh>=0 证明这是一个Node节点
if (fh >= 0) {
int runBit = fh & n;
//以下的部分在完成的工作是构造两个链表 一个是原链表 另一个是原链表的反序排列
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
//对TreeBin对象进行处理 与上面的过程类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//构造正序和反序两个链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果扩容后已经不再需要tree的结构 反向转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
}
}
}
}
}
transfer()方法真的很复杂,相信很多人都没耐心一行行阅读下来,即使阅读了一时之间也搞不懂啥意思,所以 壹哥 给各位把上面的源码分拆解读一下。
5. transfer()源码解读【重点】
5.1 设定步长stride
transfer()方法中首先会设定一个步长stride,这个stride的最小值等于MIN_TRANSFER_STRIDE=16
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
5.2 对nextTab数组进行初始化
接着判断nextTab是否为空,如果传入的nextTab为空,就会对其进行初始化。外围会保证,第一个发起迁移的线程调用此方法时,nextTab=null,后面的线程再调用时nextTab就不再为null。然后再将nextTab赋值给全局变量nextTable。
if (nextTab == null) {// initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
5.3 创建ForwardingNode节点
接下来新建了一个ForwardingNode节点,这个节点的哈希值=MOVED=-1。
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
5.4 执行一个死循环
然后会在一个死循环中进行状态的改变,这里可能会存在3种情况。
第1种情况,--i>=bound或者finshing是True:
对i减1以后,判断i是否大于边界的最小值bound。如果小于bound,则说明上次领取的一个区间任务已经完成,需要领取下一个区间任务。通常境况下,第一次进入循环时,i这个判断会无法通过,会执行下面的nextIndex赋值操作。
第2种情况,如果小于等于0,说明没有区间了,则将 i 改成 -1,且推进状态变成 false,就不再推进了,表示扩容已结束,当前线程可以退出了。这个 -1 会在下面的 if 块里判断,从而进入到完成状态。
第3种情况,就是去领取线程的任务区间,下限是bound,上限是i,bound=nextIndex-stride,i=nextIndex。
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
5.5 判断是否结束当前线程的扩容任务
如果 i 小于0,或者不在 tab数组的下标范围内,就按照上面的判断,领取最后一段区间任务,结束当前线程的扩容任务。
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//将新的 nextTab 赋值给 table 属性,完成迁移
nextTable = null;
table = nextTab; // 更新 table
//重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
5.6 创建fwd节点进行占位
接下来判断,如果旧的tab中,i位置上的值为null,就会用CAS写入一个fwd节点进行占位。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
5.7 加锁进行数据添加
接下来这部分是扩容的关键代码。
else {// 到这里,说明这个位置有实际值了,且不是占位符。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据
synchronized (f) {
// 判断 i 下标处的桶节点是否和 f 相同
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;// low, height 高位桶,低位桶
// 如果 f 的 hash 值大于 0 。TreeBin 的 hash 是 -2
if (fh >= 0) {
// 对老长度进行与运算(第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么结果的第n为也为1,否则为0)
// 由于 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种结果,一种是 0,一种是1
// 如果是结果是0 ,Doug Lea 将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中他。
int runBit = fh & n;
Node<K,V> lastRun = f; // 尾节点,且和头节点的 hash 值取于不相等
// 遍历这个桶
for (Node<K,V> p = f.next; p != null; p = p.next) {
// 取于桶中每个节点的 hash 值
int b = p.hash & n;
// 如果节点的 hash 值和首节点的 hash 值取于结果不同
if (b != runBit) {
runBit = b; // 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。
lastRun = p; // 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环
}
}
if (runBit == 0) {// 如果最后更新的 runBit 是 0 ,设置低位节点
ln = lastRun;
hn = null;
}
else {
hn = lastRun; // 如果最后更新的 runBit 是 1, 设置高位节点
ln = null;
}// 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果)
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 如果与运算结果是 0,那么就还在低位
if ((ph & n) == 0) // 如果是0 ,那么创建低位节点
ln = new Node<K,V>(ph, pk, pv, ln);
else // 1 则创建高位
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 其实这里类似 hashMap
// 设置低位链表放在新链表的 i
setTabAt(nextTab, i, ln);
// 设置高位链表,在原有长度上加 n
setTabAt(nextTab, i + n, hn);
// 将旧的链表设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
上面的代码挺难理解,我们可以配合下图帮助理解。图中灰色的圆圈代表fh & n = 0,绿色的圆圈代表fh & n ! =0。经过一轮循环以后,LastRun=8,Runbit=0,那么ln=LastRun(8节点),hn=null。后面再经过一轮循环,这样就可以将ln低位链表和hn高位链表都获取到。
5.8 链表转为红黑树
最后一段代码,则是判断是否需要进行红黑树的转移。
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 和链表相同的判断,与运算 == 0 的放在低位
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} // 不是 0 的放在高位
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位树
setTabAt(nextTab, i, ln);
// 高位数
setTabAt(nextTab, i + n, hn);
// 旧的设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}
}
}
6. transfer()总结【重点】
这个transfer()方法真的太复杂啦,我们很难记得住,所以壹哥给大家做个简单的总结,我们只需记住transfer()中大致执行了以下几个功能即可:
三. 结语
本篇文章,壹哥 重点讲解了JDK 8版本ConcurrentHashMap#transfer()扩容方法的源码以及扩容机制的流程。接下来 壹哥 会再通过另一篇文章,给大家讲解 JDK 8中的ConcurrentHashMap#get()方法的源码,请做好准备哦。