ConcurrentHashMap支持多线程并发,但又不像Hashtable和Vector一样简单粗暴的加上synchronized关键字来完成,源码大量使用了cas来保证操作的原子性,效率比Hashtable和Vector要高,也是目前多线程开发中用的最多的map<K,V>类集合。
阅读本专栏之前,需要读者对多线程开发有一定的理解,并且要理解ConcurrentHashMap底层数组+链表+红黑树的数据结构。了解HashMap源码,可以帮助理解改专栏的内容。
该篇文章着重分析ConcurrentHashMap的初始化和扩容方法。
初始化initTable
//table初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//将table赋值给tab,同时循环判断table是否为空,不为空说明已经有线程将table初始化了,则直接返回。
while ((tab = table) == null || tab.length == 0) {
//将sizeCtl(sizeControl)赋给sc,并判断是否<0,
if ((sc = sizeCtl) < 0)
//sc < 0说明有线程正在执行初始化操作,此时让出一下cpu,再次被调度后,继续执行while的判断。相当于等待的过程。
Thread.yield(); // lost initialization race; just spin
//如果sc>=0,则说明需要初始化,使用Cas的方式将sizeCtl赋值为-1,这样其他线程进来时就会走到上面的if中去。
//根据返回值判断是否赋值成功,不成功的话,直接进行下一次循环,不成功的情况说明可能其他线程已经在初始化了。
//Unsafe.compareAndSwapInt解释:
//public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
//读取传入对象o在内存中偏移量为offset位置的值与期望值expected作比较,相等就把x值赋值给offset位置的值。方法返回true。不相等,就取消赋值,方法返回false。
//具体到下面的if判断就是:
// 检查ConcurrentHashMap对象在内存中偏移量为SIZECTL位置的int值(即为sizeCtl)与sc进行比较,相同就赋值为-1并返回true,不相等则取消赋值并返回false。
// SIZECTL是一个static final的常量,代表在当前ConcurrentHashMap对象中,sizeCtl变量在内存中的偏移量,private static final long SIZECTL;
// 详见ConcurrentHashMap代码最后的static代码块
// U = sun.misc.Unsafe.getUnsafe();
// Class<?> k = ConcurrentHashMap.class;
// SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//赋值成功后进入
//此处体现sizeCtl的一个含义,即sizeCtl = -1,说明正在有线程对table进行初始化。
try {
//再次赋值并判断tab是否为空,双重检查
//防止当前线程在执行上面的if和else if判断期间,有其他线程已经完成Tab的初始化
if ((tab = table) == null || tab.length == 0) {
//如果走到这,说明没有其他线程在对tab进行初始化,且在当前线程初始化完毕之前,不会有其他线程进来(通过sc < 0、U.compareAndSwapInt(this, SIZECTL, sc, -1)和双重检查实现)
//此时sc可能>0,也可能=0,大于0则n赋值为sc,等于0则n赋值为table默认初始大小DEFAULT_CAPACITY=16。
//sc > 0的情况,是调用构造方法时传入了tab的大小。
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//创建一个大小为n的Node<K,V>数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
//新数组赋值给tab和table
table = tab = nt;
// sc 赋值为n*0.75。
// n>>>2 n无符号右移2位为原来的1/4(0.25)
// n减掉n的1/4则为n*0.75,0.75为扩容因子。
sc = n - (n >>> 2);
}
} finally {
//sc赋值给sizeCtl
//此处天sizeCtl的一个含义,即数组扩容的阈值。
sizeCtl = sc;
}
//不管当前线程有没有将table初始化,走到这里说明table已经被初始化完成了,可以跳出循环了
break;
}
}
return tab;
}
帮助扩容helpTransfer
//帮助扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//判断oldtable(即为传入的tab) 不为空 , 传入的当前f节点是ForwardingNode
//而且nextTab = f.nextTable 不为空,为空说明尚没有线程对老表进行扩容,扩容迁移期间,老表已经迁移完的节点会置为ForwardingNode,
//ForwardingNode的nextTable属性为迁移后的新表,这是为了在迁移过程中,如果有其他线程访问老表已经迁移完的元素查数据,可以通过nextTable查询。
//满足以上条件,额进入if开始扩容
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//仍然是先根据老表的长度计算扩容标识戳
int rs = resizeStamp(tab.length);
//nextTab == nextTable && table == tab 确认新还是扩容的新表,老表还是被扩容的老表。
//(sc = sizeCtl) < 0 判断当前是否仍在扩容过程中 sizeCtl = -1怎么办?
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// (sc >>> RESIZE_STAMP_SHIFT) != rs 判断是否是当前表的扩容标识戳,建addCount方法
// sc == rs + 1 || sc == rs + MAX_RESIZERS 判断是否已经所有线程退出扩容或达到扩容线程数上线
// 同addCount方法一样,这个地方同样有存在bug,
// 应该是应该是sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
// transferIndex <= 0 transferIndex的含义是,当前最小的已被分配给线程的数组元素下标,因为数组是从高到底迁移的,因此这个变量可以代表多线程迁移数组的进度。
// 小于等于0表示,当前数组的所有元素都已经分配给线程处理,当前线程不需要再帮助扩容了。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//使用cas的方式将sizeCtl + 1,可以理解为将低16位+1,扩容过程中的sizeCtl具体值没有意义,
//高16位为扩容标识戳,低16位为扩容线程数+1,应该这样去理解。
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 调用扩容方法
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
计算扩容标识戳resizeStamp
//根据数组长度n计算该数组扩容的标识戳
static final int resizeStamp(int n) {
//Integer.numberOfLeadingZeros(n) 该方法用来返回n的二进制数最高非0位之前0的个数,因为n是2的次幂,因此低位都是连续的1,高位都是连续的0,不通的高位0的个数是不一致的。
//(1 << (RESIZE_STAMP_BITS - 1) ,RESIZE_STAMP_BITS=16,因此这一句的含义是1右移15位,即为0000 0000 0000 0000 1000 0000 0000 0000
//例n = 16 则n的2进制为 0000 0000 0000 0000 0000 0000 0001 0000
//Integer.numberOfLeadingZeros(n) = 27, 转换为2进制为 0000 0000 0000 0000 0000 0000 0001 1011
//或的结果
// 0000 0000 0000 0000 0000 0000 0001 1011
// 或0000 0000 0000 0000 1000 0000 0000 0000
//等于0000 0000 0000 0000 1000 0000 0001 1011
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
扩容transfer
//线程扩容方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//计算数据迁移的步长,即每个线程应该迁移多少个元素
//NCPU>1标识多核CPU,多核cpu的情况下 将table.length 除以 8(即n >>> 3)后再除以cpu树,单核cpu不做处理。暂时不知道为啥这样计算
//如果计算后的步长小于MIN_TRANSFER_STRIDE(值为16),则步长改为16
//即每个线程至少迁移16个元素
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//第一个线程进来时,nextTab为Null
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//创建一个大小为2n的Node数组。
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
//赋值给nextTab
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
//内存溢出时,帮助退出外层循环
sizeCtl = Integer.MAX_VALUE;
return;
}
//赋值给nextTable
nextTable = nextTab;
//初始化transferIndex 为数组长度 n
//private transient volatile int transferIndex;
//用来记录多线程进行数据迁移时,当前最小的已被分配给线程的数组元素下标
//因为concurrentHashMap是按照数组下标从大到小迁移,因此初始值为tab的长度n
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 外层循环用来控制循环迁移每一个需要当前线程迁移的元素
// concurrentHashMap数据迁移是按照数组下标从大到小迁移,首先给当前线程分配一个步长范围,然后在步长范围内从大到小挨个迁移。
// 当前步长迁移完毕后,继续计算下一个需要该线程迁移的步长,多线程帮助扩容的话,不通步长的范围边界是不连续的。
for (int i = 0, bound = 0;;) {
//f:当前要迁移的节点元素节点,fh:当前要迁移元素节点的额hash值
Node<K,V> f; int fh;
//这个while循环用来计算每次要处理的数组元素索引i和当前步长处理完后寻找新的步长范围。
//advance = true,代表需要继续计算元素索引i或新的步长范围。
while (advance) {
//nextIndex:下一个要处理的步长范围上限+1,nextBound:下一个要处理的步长范围下限
int nextIndex, nextBound;
//这个if用来判断数组元素索引i是否到了当前步长范围的下限边界,如果还不到,则跳出while循环,继续处理--i位置的元素
//如果finishing=true则跳出while循环,不再计算步长和索引
if (--i >= bound || finishing)
//跳出循环
advance = false;
//检查tab的所有元素是否都已分配完毕,并将transferIndex赋值给nextIndex
else if ((nextIndex = transferIndex) <= 0) {
//这个赋值是为了循环跳出后,当前线程能够走进下面的第一个if判断,去退出扩容
i = -1;
//跳出循环
advance = false;
}
//走到这说明当前线程已经将当前步长处理完毕(不满足--i>=bound)条件
//且数组元素尚未迁移完毕(不满足finishing = true),且数组的元素上没有完全分配给线程处理(不满足transferIndex <= 0)
//此时使用cas的方式将TRANSFERINDEX修改为下一个可处理步长范围的下线,
//代表当前线程已经分配到该位置了,下一个线程来请求分配步长的时候,应该从此处开始分配。
//同时完成给nextBound赋值
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//本次步长范围的下限
bound = nextBound;
//本次步长范围开始处理的第一个元素下标(最大的)
i = nextIndex - 1;
//跳出循环
advance = false;
}
}
// i<0 确保while循环中 第二个if判断[else if ((nextIndex = transferIndex) <= 0)]后可以走进来
// i >= n,i + n >= nextn 这两个条件是等价的,确保不会超过两个数组的最大值(没找到什么时候会触发这个条件)
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//finishing == true 说明已经迁移完毕
// finishing = true是在下面的if中赋值的,走进这个循环已经是迁移完毕后再次检查也结束了。
if (finishing) {
//完成table的赋值
nextTable = null;
table = nextTab;
//重新给sizeCtl赋值为扩容阈值,这个写法的结果是2n*0.75,应该都能看懂了吧
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//线程走到这说明当前线程已经完成了迁移,依次使用cas的方式将sizeCtl-1后退出。
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 这个条件是为了判断是否是最后一个线程
//最后一个线程迁移完毕后,不能拍拍屁股就走了,还有其他工作要做。
//记不记得首次有线程扩容时sc=(rs << RESIZE_STAMP_SHIFT) + 2,所以当(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT时,说明还不是最后一个线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
//判断还不是最后一个线程,则允许退出
return;
//最后一个线程会走到这里,继续其他工作
//advance = true 是需要走到上面的while循环中,继续--i
//finishing = true 是说明迁移工作已经完成了
finishing = advance = true;
//i重新赋值为n,走上面while循环中的--i,再把所有的元素都检查一遍。
i = n; // recheck before commit
}
}
// 没有走进上面的if,说明当前步长的迁移工作尚未完成
// 此处判断一下当前元素是否为null
else if ((f = tabAt(tab, i)) == null)
//如果为null则使用cas的方式直接把改元素赋值为forwardingNode,代表此处已迁移完毕。
//修改成功过后,使用advance控制进行while循环
//说一下cas失败的情况:cas失败可能是有其它线程正在对当前元素进行赋值,此时advance=false,重新否for循环后不会进入到while循环
// i的值不会变,下次循环进来还是处理这个元素,如果此时赋值完成了,则会走到下面的else中去完成迁移。
advance = casTabAt(tab, i, null, fwd);
//根据当前元素的的哈希值判断是否是已经迁移完的元素,即forwardingNode,迁移完毕后的检查会走到这里。
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//走到这个else说明当前节点(下标为i)需要进行迁移
//上来还是先锁定当先数组元素对象(对于链表来说是头结点,对于红黑树来说是treebin节点)。
synchronized (f) {
//再次检查当前节点是否还是f节点,如果不是则啥也不干直接退出
//什么情况下会不相等呢,可能是有其他线程正在remove该节点吧
if (tabAt(tab, i) == f) {
//还是f,开始迁移
Node<K,V> ln, hn;
//fh >=0 说明是链表,其实应该不会有0的情况
if (fh >= 0) {
//注意此处跟HashMap类似,不会对节点再次计算哈希值,而是利用table的长度为2的次幂的原理,通过位运算来获取新的位置。
//n是2的次幂,因此2进制下,只有一位是1,假设n=16,则2进制为 0000 0000 0000 0000 0000 0000 0001 0000
//因此fh & n的结果只有 0 和 n 两个值。
//而根据fh计算数组下标的逻辑为 fh&(n-1)(n为2的次幂的情况下等价于对求余,可以自己算一算)
// n=16的情况下 n-1 的2进制为 0000 0000 0000 0000 0000 0000 0000 1111
// 2n-1 的2进制为 0000 0000 0000 0000 0000 0000 0001 1111
//因此 fh & (n-1)和 fh & (2n-1)的区别,仅仅是第5位是0或1的区别(n=16的情况下),反映到10进制就是 i 和 n + i的区别。
//可以得出一个结论:这一位的值为0,说明该节点还是这个下标,称之为低位;为1,说明该节点新的下标为n+i,称之为高位。
//runBit就是计算出来的这一位的值(十进制),用来区别高位和低位。
int runBit = fh & n;
//lastRun,这点跟HashMap不同,HashMap是从链表的头一个一个迁移
// 而ConcurrentHashMap会先计算该链表最后几个runBit位相同的Node,这几个不用一个一个迁移,可以作为一个子链表,直接挂过来。
// lastRun就是用来记录这个子链表的头结点。
//初始值为f,从链表头开始遍历。最好的情况下,整条链表的runBit是一样的,直接挂上即可;最坏的情况,最后两个节点的runBit不一致,只有最后一个节点能直接挂过来。
Node<K,V> lastRun = f;
//遍历链表开始循环
for (Node<K,V> p = f.next; p != null; p = p.next) {
//计算当前节点的runBit值
int b = p.hash & n;
//如果runBit有变化,则记录一下
//循环结束后,lastRun即为子链表的头结点
// runBit即为子链表的高位和低位标识
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//runBit == 0 说明子链表是低位,先把子链表挂到低位ln,此时高位hn上没有节点,因此为null
if (runBit == 0) {
ln = lastRun;
hn = null;
}
//else(runBit == n),先把子链表挂到高位hn,此时低位ln上没有节点,因此为null
else {
hn = lastRun;
ln = null;
}
//开始遍历链表上剩余的元素,从头结点f开始,到子链表的头结点lastRun的前一个节点为止。
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
//这就很好理解了,低位则根据当前节点的值声明一个节点挂到ln上,注意是每次插入都是从链表头插入。
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
//高位则根据当前节点的值声明一个节点挂到hn上,注意是每次插入都是从链表头插入。
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将低位链表挂到nextTab的i位置。
setTabAt(nextTab, i, ln);
//将高位链表挂到nextTab的 i+n 位置。
setTabAt(nextTab, i + n, hn);
//将oldTable的i位置置为forwardingNode,代表该处已经迁移完毕。
setTabAt(tab, i, fwd);
//advance置为ture,继续走到上面的while循环中。
advance = true;
}
//判断一下f是不是红黑树节点,TreeBin的hash为-2,因此不会走到上面的链表处理中
else if (f instanceof TreeBin) {
//红黑树的迁移也比较简单,因为在链表转红黑树时,并没有破坏原有的链表结构,数据的增删改,也维持了链表结构。
//所以,根据链表即可以遍历所有的元素。
//这里多说一点,红黑树的节点TreeNode继承自链表的Node节点,保留了Node的next属性,但TreeNode还有自己的pre属性,这个是Node没有的,为什么要增加一个pre改成双向链表呢。
// 原因是:红黑树的删除操作要维持链表结构的话,就需要在remove掉一个节点后,把节点在链表上的前后位置的节点连接起来
// 而红黑树的remove操作是根据红黑树查找的算法找到的这个节点,如果只维持一个next属性的话,删除时,并不知道当前节点的前一个节点是谁,没法连接起来。
// 链表不存在该问题,因为链表从头开始遍历,肯定是从前一个节点遍历过去的。
//言归正传,先把f节点转换为TreeBin节点,并赋值给t
TreeBin<K,V> t = (TreeBin<K,V>)f;
//声明两个变量,记录一下低位链表的头和尾
TreeNode<K,V> lo = null, loTail = null;
//声明两个变量,记录一下高位链表的头和尾
TreeNode<K,V> hi = null, hiTail = null;
//低位和高位Node计数
int lc = 0, hc = 0;
//使用链表结果循环遍历
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
//将当前节点的值创建为TreeNode
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
//计算高位还是低位
if ((h & n) == 0) {
//loTail为null标识是首次向链表插入数据
//同时将当前节点的前一个节点指向尾结点,实现双向链表
if ((p.prev = loTail) == null)
//首次插入,将节点赋值给链表头
lo = p;
else
//否则将节点挂在链表尾,跟上面不同,这里是从尾部插入
loTail.next = p;
//这是后p是链表尾了,将p赋值给链表尾
loTail = p;
//计数
++lc;
}
else {
//同低位一样的操作,不再赘述
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//有点复杂,注意看
//(lc <= UNTREEIFY_THRESHOLD) 如果低位的节点数小于树化的临界值,则将低位链表lo反树化(untreeify(lo))
// 否则再看高位计数(hc)是不是0,如果高位计数是0,说明都是在低位链表上,这样直接把原来的TreeBin节点t拿过来,不用在耗费资源进行树化了。
// 如果高位计数不是0,说明既有高位又有低位,则将lo链表初始化为TreeBin(new TreeBin<K,V>(lo)),初始化TreeBin的过程中会将链表树化。
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
// 高位的链表同样处理
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//将低位ln(可能为链表或树)挂在nextTab的i位置。
setTabAt(nextTab, i, ln);
//将高位ln(可能为链表或树)挂在nextTab的i + n位置。
setTabAt(nextTab, i + n, hn);
//将oldTable的i位置置为forwardingNode,代表该处已经迁移完毕。
setTabAt(tab, i, fwd);
//advance置为ture,继续走到上面的while循环中。
advance = true;
}
}
}
}
}
}