0
点赞
收藏
分享

微信扫一扫

ConcurrentHashMap源码深度解析(没有比这更详细的了)-2

月白色的大狒 2022-03-12 阅读 92
java

ConcurrentHashMap支持多线程并发,但又不像Hashtable和Vector一样简单粗暴的加上synchronized关键字来完成,源码大量使用了cas来保证操作的原子性,效率比Hashtable和Vector要高,也是目前多线程开发中用的最多的map<K,V>类集合。

阅读本专栏之前,需要读者对多线程开发有一定的理解,并且要理解ConcurrentHashMap底层数组+链表+红黑树的数据结构。了解HashMap源码,可以帮助理解改专栏的内容。

    该篇文章着重分析ConcurrentHashMap的初始化和扩容方法。


初始化initTable

	//table初始化   
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
		//将table赋值给tab,同时循环判断table是否为空,不为空说明已经有线程将table初始化了,则直接返回。
        while ((tab = table) == null || tab.length == 0) {
			//将sizeCtl(sizeControl)赋给sc,并判断是否<0,
            if ((sc = sizeCtl) < 0)
				//sc < 0说明有线程正在执行初始化操作,此时让出一下cpu,再次被调度后,继续执行while的判断。相当于等待的过程。
                Thread.yield(); // lost initialization race; just spin
			//如果sc>=0,则说明需要初始化,使用Cas的方式将sizeCtl赋值为-1,这样其他线程进来时就会走到上面的if中去。
			//根据返回值判断是否赋值成功,不成功的话,直接进行下一次循环,不成功的情况说明可能其他线程已经在初始化了。
			//Unsafe.compareAndSwapInt解释:
			//public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
			//读取传入对象o在内存中偏移量为offset位置的值与期望值expected作比较,相等就把x值赋值给offset位置的值。方法返回true。不相等,就取消赋值,方法返回false。
			//具体到下面的if判断就是:
			//  检查ConcurrentHashMap对象在内存中偏移量为SIZECTL位置的int值(即为sizeCtl)与sc进行比较,相同就赋值为-1并返回true,不相等则取消赋值并返回false。
			//  SIZECTL是一个static final的常量,代表在当前ConcurrentHashMap对象中,sizeCtl变量在内存中的偏移量,private static final long SIZECTL;
			//  详见ConcurrentHashMap代码最后的static代码块
			//          U = sun.misc.Unsafe.getUnsafe();
			//			Class<?> k = ConcurrentHashMap.class;
			//			SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
				//赋值成功后进入
				//此处体现sizeCtl的一个含义,即sizeCtl = -1,说明正在有线程对table进行初始化。
                try {
					//再次赋值并判断tab是否为空,双重检查
					//防止当前线程在执行上面的if和else if判断期间,有其他线程已经完成Tab的初始化
                    if ((tab = table) == null || tab.length == 0) {
						//如果走到这,说明没有其他线程在对tab进行初始化,且在当前线程初始化完毕之前,不会有其他线程进来(通过sc < 0、U.compareAndSwapInt(this, SIZECTL, sc, -1)和双重检查实现)
						//此时sc可能>0,也可能=0,大于0则n赋值为sc,等于0则n赋值为table默认初始大小DEFAULT_CAPACITY=16。
						//sc > 0的情况,是调用构造方法时传入了tab的大小。
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
						
						//创建一个大小为n的Node<K,V>数组
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
						//新数组赋值给tab和table
                        table = tab = nt;
						// sc 赋值为n*0.75。
						// n>>>2 n无符号右移2位为原来的1/4(0.25)
						// n减掉n的1/4则为n*0.75,0.75为扩容因子。
                        sc = n - (n >>> 2);
                    }
                } finally {
					//sc赋值给sizeCtl
					//此处天sizeCtl的一个含义,即数组扩容的阈值。
                    sizeCtl = sc;
                }
				//不管当前线程有没有将table初始化,走到这里说明table已经被初始化完成了,可以跳出循环了
                break;
            }
        }
        return tab;
    }

帮助扩容helpTransfer

	//帮助扩容
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
		//判断oldtable(即为传入的tab) 不为空 , 传入的当前f节点是ForwardingNode 
		//而且nextTab = f.nextTable 不为空,为空说明尚没有线程对老表进行扩容,扩容迁移期间,老表已经迁移完的节点会置为ForwardingNode,
		//ForwardingNode的nextTable属性为迁移后的新表,这是为了在迁移过程中,如果有其他线程访问老表已经迁移完的元素查数据,可以通过nextTable查询。
		//满足以上条件,额进入if开始扩容
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
			//仍然是先根据老表的长度计算扩容标识戳
            int rs = resizeStamp(tab.length);
			//nextTab == nextTable && table == tab  确认新还是扩容的新表,老表还是被扩容的老表。
			//(sc = sizeCtl) < 0 判断当前是否仍在扩容过程中 sizeCtl = -1怎么办?
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
				// (sc >>> RESIZE_STAMP_SHIFT) != rs  判断是否是当前表的扩容标识戳,建addCount方法
				//  sc == rs + 1 || sc == rs + MAX_RESIZERS 判断是否已经所有线程退出扩容或达到扩容线程数上线
                //                                          同addCount方法一样,这个地方同样有存在bug,
				//                                          应该是应该是sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS 
				//  transferIndex <= 0 transferIndex的含义是,当前最小的已被分配给线程的数组元素下标,因为数组是从高到底迁移的,因此这个变量可以代表多线程迁移数组的进度。
				//                     小于等于0表示,当前数组的所有元素都已经分配给线程处理,当前线程不需要再帮助扩容了。
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
				//使用cas的方式将sizeCtl + 1,可以理解为将低16位+1,扩容过程中的sizeCtl具体值没有意义,
				//高16位为扩容标识戳,低16位为扩容线程数+1,应该这样去理解。
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
					// 调用扩容方法
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

计算扩容标识戳resizeStamp

	//根据数组长度n计算该数组扩容的标识戳
	static final int resizeStamp(int n) {
		//Integer.numberOfLeadingZeros(n) 该方法用来返回n的二进制数最高非0位之前0的个数,因为n是2的次幂,因此低位都是连续的1,高位都是连续的0,不通的高位0的个数是不一致的。
		//(1 << (RESIZE_STAMP_BITS - 1) ,RESIZE_STAMP_BITS=16,因此这一句的含义是1右移15位,即为0000 0000 0000 0000 1000 0000 0000 0000
		//例n = 16 则n的2进制为 0000 0000 0000 0000 0000 0000 0001 0000
		//Integer.numberOfLeadingZeros(n) = 27, 转换为2进制为 0000 0000 0000 0000 0000 0000 0001 1011
		//或的结果 
		//    0000 0000 0000 0000 0000 0000 0001 1011
		//  或0000 0000 0000 0000 1000 0000 0000 0000
		//等于0000 0000 0000 0000 1000 0000 0001 1011
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

扩容transfer

	//线程扩容方法
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
		//计算数据迁移的步长,即每个线程应该迁移多少个元素
		//NCPU>1标识多核CPU,多核cpu的情况下 将table.length 除以 8(即n >>> 3)后再除以cpu树,单核cpu不做处理。暂时不知道为啥这样计算
		//如果计算后的步长小于MIN_TRANSFER_STRIDE(值为16),则步长改为16
		//即每个线程至少迁移16个元素
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
		//第一个线程进来时,nextTab为Null
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
				//创建一个大小为2n的Node数组。
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
				//赋值给nextTab
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
				//内存溢出时,帮助退出外层循环
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
			//赋值给nextTable
            nextTable = nextTab;
			//初始化transferIndex 为数组长度 n
			//private transient volatile int transferIndex;
			//用来记录多线程进行数据迁移时,当前最小的已被分配给线程的数组元素下标
			//因为concurrentHashMap是按照数组下标从大到小迁移,因此初始值为tab的长度n
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
		// 外层循环用来控制循环迁移每一个需要当前线程迁移的元素
		// concurrentHashMap数据迁移是按照数组下标从大到小迁移,首先给当前线程分配一个步长范围,然后在步长范围内从大到小挨个迁移。
		// 当前步长迁移完毕后,继续计算下一个需要该线程迁移的步长,多线程帮助扩容的话,不通步长的范围边界是不连续的。
        for (int i = 0, bound = 0;;) {
			//f:当前要迁移的节点元素节点,fh:当前要迁移元素节点的额hash值
            Node<K,V> f; int fh;
			//这个while循环用来计算每次要处理的数组元素索引i和当前步长处理完后寻找新的步长范围。
			//advance = true,代表需要继续计算元素索引i或新的步长范围。
            while (advance) {
				//nextIndex:下一个要处理的步长范围上限+1,nextBound:下一个要处理的步长范围下限
                int nextIndex, nextBound;
				//这个if用来判断数组元素索引i是否到了当前步长范围的下限边界,如果还不到,则跳出while循环,继续处理--i位置的元素
				//如果finishing=true则跳出while循环,不再计算步长和索引
                if (--i >= bound || finishing)
					//跳出循环
                    advance = false;
				//检查tab的所有元素是否都已分配完毕,并将transferIndex赋值给nextIndex
                else if ((nextIndex = transferIndex) <= 0) {
					//这个赋值是为了循环跳出后,当前线程能够走进下面的第一个if判断,去退出扩容
                    i = -1;
					//跳出循环
                    advance = false;
                }
				//走到这说明当前线程已经将当前步长处理完毕(不满足--i>=bound)条件
				//且数组元素尚未迁移完毕(不满足finishing = true),且数组的元素上没有完全分配给线程处理(不满足transferIndex <= 0)
				//此时使用cas的方式将TRANSFERINDEX修改为下一个可处理步长范围的下线,
				//代表当前线程已经分配到该位置了,下一个线程来请求分配步长的时候,应该从此处开始分配。
				//同时完成给nextBound赋值
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
					//本次步长范围的下限	   
                    bound = nextBound;
					//本次步长范围开始处理的第一个元素下标(最大的)
                    i = nextIndex - 1;
					//跳出循环
                    advance = false;
                }
            }
			// i<0 确保while循环中 第二个if判断[else if ((nextIndex = transferIndex) <= 0)]后可以走进来
			// i >= n,i + n >= nextn  这两个条件是等价的,确保不会超过两个数组的最大值(没找到什么时候会触发这个条件)
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
				//finishing == true 说明已经迁移完毕
				// finishing = true是在下面的if中赋值的,走进这个循环已经是迁移完毕后再次检查也结束了。
                if (finishing) {
					//完成table的赋值
                    nextTable = null;
                    table = nextTab;
					//重新给sizeCtl赋值为扩容阈值,这个写法的结果是2n*0.75,应该都能看懂了吧
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
				//线程走到这说明当前线程已经完成了迁移,依次使用cas的方式将sizeCtl-1后退出。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
					//(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 这个条件是为了判断是否是最后一个线程
					//最后一个线程迁移完毕后,不能拍拍屁股就走了,还有其他工作要做。
					//记不记得首次有线程扩容时sc=(rs << RESIZE_STAMP_SHIFT) + 2,所以当(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT时,说明还不是最后一个线程
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
						//判断还不是最后一个线程,则允许退出
                        return;
					//最后一个线程会走到这里,继续其他工作
					//advance = true 是需要走到上面的while循环中,继续--i
					//finishing = true 是说明迁移工作已经完成了
                    finishing = advance = true;
					//i重新赋值为n,走上面while循环中的--i,再把所有的元素都检查一遍。
                    i = n; // recheck before commit
                }
            }
			// 没有走进上面的if,说明当前步长的迁移工作尚未完成
			// 此处判断一下当前元素是否为null
            else if ((f = tabAt(tab, i)) == null)
				//如果为null则使用cas的方式直接把改元素赋值为forwardingNode,代表此处已迁移完毕。
				//修改成功过后,使用advance控制进行while循环
				//说一下cas失败的情况:cas失败可能是有其它线程正在对当前元素进行赋值,此时advance=false,重新否for循环后不会进入到while循环
				//                     i的值不会变,下次循环进来还是处理这个元素,如果此时赋值完成了,则会走到下面的else中去完成迁移。
                advance = casTabAt(tab, i, null, fwd);
			//根据当前元素的的哈希值判断是否是已经迁移完的元素,即forwardingNode,迁移完毕后的检查会走到这里。
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
				//走到这个else说明当前节点(下标为i)需要进行迁移
				//上来还是先锁定当先数组元素对象(对于链表来说是头结点,对于红黑树来说是treebin节点)。
                synchronized (f) {
					//再次检查当前节点是否还是f节点,如果不是则啥也不干直接退出
					//什么情况下会不相等呢,可能是有其他线程正在remove该节点吧
                    if (tabAt(tab, i) == f) {
						//还是f,开始迁移
                        Node<K,V> ln, hn;
						//fh >=0 说明是链表,其实应该不会有0的情况
                        if (fh >= 0) {
							//注意此处跟HashMap类似,不会对节点再次计算哈希值,而是利用table的长度为2的次幂的原理,通过位运算来获取新的位置。
							//n是2的次幂,因此2进制下,只有一位是1,假设n=16,则2进制为 0000 0000 0000 0000 0000 0000 0001 0000
							//因此fh & n的结果只有 0 和 n 两个值。
							//而根据fh计算数组下标的逻辑为 fh&(n-1)(n为2的次幂的情况下等价于对求余,可以自己算一算)
							//   n=16的情况下 n-1 的2进制为 0000 0000 0000 0000 0000 0000 0000 1111
							//               2n-1 的2进制为 0000 0000 0000 0000 0000 0000 0001 1111
							//因此 fh & (n-1)和 fh & (2n-1)的区别,仅仅是第5位是0或1的区别(n=16的情况下),反映到10进制就是 i 和 n + i的区别。
							//可以得出一个结论:这一位的值为0,说明该节点还是这个下标,称之为低位;为1,说明该节点新的下标为n+i,称之为高位。
							//runBit就是计算出来的这一位的值(十进制),用来区别高位和低位。
                            int runBit = fh & n;
							//lastRun,这点跟HashMap不同,HashMap是从链表的头一个一个迁移
							//         而ConcurrentHashMap会先计算该链表最后几个runBit位相同的Node,这几个不用一个一个迁移,可以作为一个子链表,直接挂过来。
							//         lastRun就是用来记录这个子链表的头结点。
							//初始值为f,从链表头开始遍历。最好的情况下,整条链表的runBit是一样的,直接挂上即可;最坏的情况,最后两个节点的runBit不一致,只有最后一个节点能直接挂过来。
                            Node<K,V> lastRun = f;
							//遍历链表开始循环
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
								//计算当前节点的runBit值
                                int b = p.hash & n;
								//如果runBit有变化,则记录一下
								//循环结束后,lastRun即为子链表的头结点
								//            runBit即为子链表的高位和低位标识
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
							//runBit == 0 说明子链表是低位,先把子链表挂到低位ln,此时高位hn上没有节点,因此为null
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
							//else(runBit == n),先把子链表挂到高位hn,此时低位ln上没有节点,因此为null
                            else {
				                hn = lastRun;
                                ln = null;
                            }
							//开始遍历链表上剩余的元素,从头结点f开始,到子链表的头结点lastRun的前一个节点为止。
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
								//这就很好理解了,低位则根据当前节点的值声明一个节点挂到ln上,注意是每次插入都是从链表头插入。
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
								//高位则根据当前节点的值声明一个节点挂到hn上,注意是每次插入都是从链表头插入。
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
							//将低位链表挂到nextTab的i位置。
                            setTabAt(nextTab, i, ln);
							//将高位链表挂到nextTab的 i+n 位置。
                            setTabAt(nextTab, i + n, hn);
							//将oldTable的i位置置为forwardingNode,代表该处已经迁移完毕。
                            setTabAt(tab, i, fwd);
							//advance置为ture,继续走到上面的while循环中。
                            advance = true;
                        }
						//判断一下f是不是红黑树节点,TreeBin的hash为-2,因此不会走到上面的链表处理中
                        else if (f instanceof TreeBin) {
							//红黑树的迁移也比较简单,因为在链表转红黑树时,并没有破坏原有的链表结构,数据的增删改,也维持了链表结构。
							//所以,根据链表即可以遍历所有的元素。
							//这里多说一点,红黑树的节点TreeNode继承自链表的Node节点,保留了Node的next属性,但TreeNode还有自己的pre属性,这个是Node没有的,为什么要增加一个pre改成双向链表呢。
							//    原因是:红黑树的删除操作要维持链表结构的话,就需要在remove掉一个节点后,把节点在链表上的前后位置的节点连接起来
							//            而红黑树的remove操作是根据红黑树查找的算法找到的这个节点,如果只维持一个next属性的话,删除时,并不知道当前节点的前一个节点是谁,没法连接起来。
							//            链表不存在该问题,因为链表从头开始遍历,肯定是从前一个节点遍历过去的。
							//言归正传,先把f节点转换为TreeBin节点,并赋值给t
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
							//声明两个变量,记录一下低位链表的头和尾
                            TreeNode<K,V> lo = null, loTail = null;
							//声明两个变量,记录一下高位链表的头和尾
                            TreeNode<K,V> hi = null, hiTail = null;
							//低位和高位Node计数
                            int lc = 0, hc = 0;
							//使用链表结果循环遍历
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
								//将当前节点的值创建为TreeNode
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
								//计算高位还是低位
                                if ((h & n) == 0) {
									//loTail为null标识是首次向链表插入数据
									//同时将当前节点的前一个节点指向尾结点,实现双向链表
                                    if ((p.prev = loTail) == null)
										//首次插入,将节点赋值给链表头
                                        lo = p;
                                    else
										//否则将节点挂在链表尾,跟上面不同,这里是从尾部插入
                                        loTail.next = p;
									//这是后p是链表尾了,将p赋值给链表尾
                                    loTail = p;
									//计数
                                    ++lc;
                                }
                                else {
									//同低位一样的操作,不再赘述
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
							//有点复杂,注意看
							//(lc <= UNTREEIFY_THRESHOLD) 如果低位的节点数小于树化的临界值,则将低位链表lo反树化(untreeify(lo))
							// 否则再看高位计数(hc)是不是0,如果高位计数是0,说明都是在低位链表上,这样直接把原来的TreeBin节点t拿过来,不用在耗费资源进行树化了。
							//                              如果高位计数不是0,说明既有高位又有低位,则将lo链表初始化为TreeBin(new TreeBin<K,V>(lo)),初始化TreeBin的过程中会将链表树化。
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
							// 高位的链表同样处理
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
							//将低位ln(可能为链表或树)挂在nextTab的i位置。
                            setTabAt(nextTab, i, ln);
							//将高位ln(可能为链表或树)挂在nextTab的i + n位置。
                            setTabAt(nextTab, i + n, hn);
							//将oldTable的i位置置为forwardingNode,代表该处已经迁移完毕。
                            setTabAt(tab, i, fwd);
							//advance置为ture,继续走到上面的while循环中。
                            advance = true;
                        }
                    }
                }
            }
        }
    }
举报

相关推荐

ConcurrentHashMap1.8源码解析

0 条评论