https://blog.csdn.net/qq_29031555/article/details/105907234
前言
之前研读过HashMap源码,实现过一遍HashMap源码。但是HashMap是线程不安全的,而ConcurrentHashMap是线程安全的,博主我觉得自己多线程方面差了点,遂决定去研究一番ConcurrentHashMap。这一看,emmm,妙呀,有时一个方法起码得研究好几天,打开一篇知识的海洋。笔者去年终于看完整个put方法的处理,〒▽〒,实现到扩容那里了。后面由于换工作,面试,这事就耽搁下了,如今我重新捡起来,主要是我刚写完一篇HashMap的源码解读。嗯,我最近也在并行了解分布式事务。换着来,比较不那么容易腻一些。 看这篇之前,建议先了解下HashMap底层,因为java8的ConcurrentHashMap许多原理与HashMap是一致,和HashMap一样的内容,就不细述了。👉java8 HashMap源码解读
anyway,研究完这个,我发现了一件事。好像对于高并发的场景,无论这个ConcurrentHashMap还是分布式系统,从宏观来说,解决的基本思路就是分治思想,就像大禹治水,堵不如疏。
概述
ConcurrentHashMap是concurrent包下的线程安全的Map集合。HashMap是线程不安全,HashTable是线程安全,但是它是对整个hash表加锁,锁的范围太大。
java7时的ConcurrentHashMap
结构
由HashTable可知,在多核多CPU的情况下,多线程可以并行,但是当遇到了HashTable时就变得不一样了。由于整个HashTable加了锁,每次只有一个线程能拿到锁执行,其他线程等待。本来每次可以同时干多件事,变成每次只能干一件事,效率就低了。而1.7的ConcurrentHashMap将锁的范围变小,每次不锁整个表,可锁的范围分成多块,代表可以有多个线程同时干活。
而这时的ConcurrentHashMap就不一样了。 当put一个元素时: 1.通过对key的hash计算得到一个下标,是Segment[]的下标 2.线程申请该segment的锁,拿到segment[i]锁 2.1.如果获取不到锁,循环一定次数去尝试拿锁,超过一定次数没获取到锁后,线程进入 阻塞状态等待。 3.开始对segment[i]的hashEntry进行put操作。 3.1Segment继承自ReentrantLock,独占的可重入式锁。所以拿到Segment后的操作不需要考虑并发,因为Segment是独占锁。
由此可知,Segment[]代表着并发度,16个Segment代表可以同时有16线程并行,而Segment是不会扩容的,只有Segment[i]下的HashEntry会扩容。当并发量越来越来大,即使有个16个并发度,对于数量大的线程数面前是不够用的,供小于求,照样会有大量的线程进入阻塞状态。
所以就有1.8的优化,直接抛弃segment这种方式。
java8的ConcurrentHashMap
如图,1.8时ConcurrentHashMap的结构与HashMap一样,就是一个单纯的链表数组,计算hash、寻址、扩容时链表拆分等基本计算方法和HashMap的方式是一样的。没有Segment[],所以它的并发度就是数组的长度。既然如此,没了Segement可重入锁,如何保证线程安全,put和扩容的线程安全如何保障?解决方法在于对CAS的使用,和对线程的控制。
前置知识点
在开始解析concurrentHashMap前,需要知道的一些知识点
CAS
CAS即比较并交换,首先线程从内存中读取x的值a,而后再将a值于当前内存中x的值,设此时内存x的值为b;比较a和b,若期待值a与b相同,则修改内存中x的值为c,不同则不修改。这是一种乐观锁策略,不加锁。
在多线程的情况下,CAS会产生ABA的问题: 内存中有X值为1,A线程拿到X的值为1,这时B线程抢占cpu时间片,将内存中的X先修改为2, 后又修改为1;这时线程A再回来查看,发现X的值还是1,遂修改为3。 ABA的问题好像不是什么大问题,但在程序中,值一样不代表没有变化,比如内存地址可能变了。
· 解决这种有两个方法: ①使得比较修改这部分操作具有原子性,比如再秒杀场景中,使用lua脚本操作redis修改数据, 因为redis只支持乐观锁,lua程序执行是原子性的。 ②增加版本号,每修改一次,版本号加1,每次比较根据版本号判断。
线程的内存模型
java的线程内存模型如下图:↓
首先java内存模型规定所有变量都存在主存中,每个线程都有自己的工作内存,所有操作都在工作内存进行,不能直接操作主存,也不能操作其他线程的工作内存。
线程修改一个变量,先修改工作内存中的值,再写入主存中(什么时候写入主存未知)。 那么对于多个线程的共享的变量来说,假如线程a,线程b共同操作变量y,线程a修改了y变量值为2 ,还没有写入主存中,那线程b首先能直接操作的是自己工作内存,这时自己的工作内存中y的值不是 2,那么如何使得线程b知道变量y的值已变成2了呢? · 对于这个问题,java提供一个关键字volatile,使得共享变量可见。具体步骤如下: 当一个变量x被volatile关键字修饰时,当线程1在工作内存修改了x的值时,会强制将x的值写入主存中; 其次由于线程1的修改操作,会导致线程2工作内存中x的缓存无效,所以当线程1要读取x时,会到主存中读取.
要注意的是,volatile关键字能保证操作的可见性,但它没法保证操作的原子性: · 比如,有一共享变量y值为1. · 线程1首先获取了共享变量y的值 1。 · 而这时线程2抢占cpu时间片获取共享变量y,并修改将y的值加1等于2,随后刷新主存中y的 值; · 那么这时线程1将之前获取到的y的值加1,变成2,刷新主存y值变成2; · 按理来说,y是共享变量,两条线程都对其进行加1操作,主存中的值应该是3才对.但是由于线程对共享变量y的加1操作不具有原子性,导致最后结果的误差.仔细想想,这个事好像事务的隔离级别的那种.
线程的并发和并行
并发 并发是指,多个线程在一个cpu上交替执行,在整体上看来像是多个线程在同时做事,其实只是它们之间切换的快。
所以在单核单cpu的情况下,多线程可能还不如单线程,多线程还涉及到线程状态的切换带来的资源消耗,单也的确提高了cpu的利用率。
并行 并行是指,在同一时刻同时干两件事。在多核多cpu的情况下,多线程可能分布在不同的逻辑处理器中,它们不需要竞争cpu的资源。
一般开启多线程,这个线程数也是有讲究的,比如我的电脑是1个cpu,4内核,8个逻辑处理器,一个逻辑处理器一个线程,说明1个内核可以支持2个超线程。所以是8线程比较合适。
源码&原理解析
在解析put方法之前必须要知道的核心操作方法和核心全局变量
依赖的类
Unsafe类
Unsafe在java.util.conncurrent包下的类中占距很重要的位置,里面大多是CAS操作。Unsafe类使得java拥有像C语言指针一样操作内存空间的能力。 Unsafe可以直接操作内存,速度会更快,在并发的条件下能提供更好的效率。但也意味着不安全,不受jvm管理,无法被GC,需要自己释放,一个不小心会内存泄漏。 Unsafe类中大多方法被native修饰,意思是这些方法调用的都是c语言实现接口。
/** * 比较地址相较于对象o地址偏移l 处的值是否等于o1,是则修改其值为o2,并返回true。 * 否,则返回false * params: * o-比较交换的对象实例 * l-偏移量,在o地址的基础上的偏移量 * o1-期望值,即期望主存中o的值 * o2-修改值 */ public final native boolean compareAndSwapObject(java.lang.Object o, long l, java.lang.Object o1, java.lang.Object o2); /** * 这个方法与上一个方法的区别是,要比较修改的是一个int类型 */ public final native boolean compareAndSwapInt(java.lang.Object o, long l, int i, int i1); /** * 这个方法与上一个方法的区别是,要比较修改的是一个long类型 */ public final native boolean compareAndSwapLong(java.lang.Object o, long l, long l1, long l2); /** * 获取主存中o对象中偏移l的值 */ public native java.lang.Object getObjectVolatile(java.lang.Object o, long l); /** * 对象o中偏移l处的值修改为o1,并对其他线程立马可见 */ public native void putObjectVolatile(java.lang.Object o, long l, java.lang.Object o1); /** * 获取数组的第一个元素的地址,即数组首地址,知道c语言指针的应该秒懂。 */ public native int arrayBaseOffset(java.lang.Class<?> aClass); /** * 获取数组元素所占字节个数,比如int[]数组,int类型占4个字节,返回4 */ public native int arrayIndexScale(java.lang.Class<?> aClass); 12345678910111213141516171819202122232425262728293031323334
concurrentHashMap中对Unsafe的使用
// Unsafe mechanics private static final sun.misc.Unsafe U; //sizeCtl的偏移量 private static final long SIZECTL; //transferIndex的偏移量 private static final long TRANSFERINDEX; //baseCount的偏移量 private static final long BASECOUNT; //cellsBusy的偏移量 private static final long CELLSBUSY; //cellValue的偏移量 private static final long CELLVALUE; //数组首地址偏移量 private static final long ABASE; //表示数组元素所占字节大小1<<ASHIFT private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; //获取sizeCtl字段的偏移量 SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); /** * 众所周知int类型占4个字节,一个字节8位二进制。4个字节是32个二进制 * Integer.numberOfLeadingZeros(n) * 这个方法返回的是32个二进制中,从左往右数,到n最高位之间有多少个0;n=8,该方法 * 会返回29; * 31-Integer.numberOfLeadingZeros(scale)则可以获得,从右往左数到n最高位之间有 * 多少位。 * 31-Integer.numberOfLeadingZeros(8)=3 */ ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
关于ABASE,ASHIFT是用来获得数组元素的偏移量,根据偏移量访问并操作数组元素。 ABASE是数组首地址,ASHIFT表示数组元素占1<< ASHIFT个字节数; 举个例子:假设有个long[]数组,首地址ABASE=16,long占8个字节,那么ASHIFT=3,下标为i=3的元素的地址是 16+(3-0)×8=16+3×8=40=ABSE+(i-0)×(1<< ASHIFT)=ABASE+i<< ASHIFT
ps: 在sun公司的源码中,只有计算机的二进制运算,比如&、|、^、<<、>>>,因为相较于×、÷,二进制运算快得多。所以ASHIFT的存在只是为了方便二进制运算
比较重要的共享变量
sizeCtl变量
这个变量简直贯穿ConcurrentHashMap扩容啊,我当时为了搞清它,都卡了好久。
sizeCtl 这个变量是个多线程共享变量,-1的时候代表有线程在初始化数组。 大于0时记录的是触发扩容的阈值。 小于-1代表有(-sizeCtl-1)条线程在协调扩容:英文注释里是这么讲的,起初我也这么单纯的信了,but就是因为我信了这句话,怎么看都觉得addCount方法的代码有问题。甚至和这句话自相矛盾,我总觉得这是个坑。这句话划掉,简直干扰我思路。 小于-1代表有线程在扩容:👈改成这个
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl; 123456789
table[]变量
当前使用的数组对象
/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node<K,V>[] table; 12345
nextTab[]变量
扩容时指向的新数组对象,扩容完成后置为null
/** * The next table to use; non-null only while resizing. */ private transient volatile Node<K,V>[] nextTable; 1234
baseCount 变量
在put成功后通过CAS(Unsafe.compareAndSwapLong()方法)自增Unsafe,如果CAS失败,说明有其他线程竞争,那么直接使用LongAdder的方法来计数,这涉及到另一个变量counterCells
/** * Base counter value, used mainly when there is no contention, * but also as a fallback during table initialization * races. Updated via CAS. */ private transient volatile long baseCount; 123456
counterCells 变量
计数器数组,思想是longAdder原理。详细下面addCount()方法解析时会详细讲。 总之,concurrentHashMap的 s i z e = b a s e C o u n t + ∑ i = 0 n c o u n t C e l l [ i ] size = baseCount + \sum_{i=0}^{n}countCell[i]siz**e=baseCount+∑i=0ncountCell[i]
/** * Table of counter cells. When non-null, size is a power of 2. */ private transient volatile CounterCell[] counterCells; 1234
transferIndex
在扩容时,需要处理的链表(链表数组嘛)所在的下标范围最大值+1,或者说要处理的链表的个数。比如,旧数组长度是16,扩容时,nextTab.length=32,那么要处理节点的总范围是第一个到第32个,即transferIndex=32.
/** * The next table index (plus one) to split while resizing. */ private transient volatile int transferIndex; 1234
ConcurrentHashMap.Node.hash 一些值的含义
· -1 该节点所处链表正在扩容处理中 · -2 是一颗红黑树的根节点
/* * Encodings for Node hash fields. See above for explanation. */ static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees 123456
内部类Node的几种子类
ConcurrentHashMap.Node.java 类
继承自Map.Entry。一般正常情况下ConcurrentHashMap的元素节点。 这种Node的hash值一般大于0.
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * Virtualized support for map.get(); overridden in subclasses. */ Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546
ConcurrentHashMap.ForwardingNode.java 类
在对某个一个槽的链表进行扩容移动期间会将头节点转换成ForwardingNode类型的Node。ForwardingNode的hash值一般是-1.
具体操作是,当将一个卡槽的数据已经移动到新数组中之后,但整体的扩容流程还未完成。会将旧数组中已经移动数据的卡槽设置为ForwardingNode类型,而这个类型里面持有一个nextTable对象(即新数组对象的引用)。当某个线程需要get旧数组原来这个槽位的数据的时候,可以通过就是ForwardingNode中nextTable引用到新数组中查找。
/** * A node inserted at head of bins during transfer operations. */ static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes //当前正在扩容时,所以从扩容后的新数组对象里找 outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; //寻找的节点所在的卡槽链表或树在新数组中不为null for (;;) { int eh; K ek; //该卡槽的链表或树已完成移动 if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; //hash<0,两种情况,1:在扩容,即ForwardingNode类型,2:红黑树,hash=-2 if (eh < 0) { //扩容移位中,跳下一次循环,知道该卡槽移位完成 if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } //红黑树 else return e.find(h, k); } //寻找到最末尾都没能找到对应的元素 if ((e = e.next) == null) return null; } } } } 12345678910111213141516171819202122232425262728293031323334353637383940414243
ConcurrentHashMap.TreeBin.java
一整个红黑树引用对象,保存了root,还维护了一条节点顺序链表,这点就有点像那个B+Tree,所有节点的数据都在叶子节点并且还形成一条链表。 对红黑树的操作都在这里面。
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; //链表头 volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock //还有很长很长的代码略过 ...... } 12345678910111213
本人实在不想贴那么巨长的代码,前面写过一篇红黑树原理的👉红黑树原理和算法
ConcurrentHashMap.TreeNode.java
TreeNode是用来维护红黑树中单个节点的。
/** * Nodes for use in TreeBins */ static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } Node<K,V> find(int h, Object k) { return findTreeNode(h, k, null); } /** * Returns the TreeNode (or null if not found) for the given key * starting at given root. */ final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) { if (k != null) { TreeNode<K,V> p = this; do { int ph, dir; K pk; TreeNode<K,V> q; TreeNode<K,V> pl = p.left, pr = p.right; if ((ph = p.hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if (pl == null) p = pr; else if (pr == null) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) p = (dir < 0) ? pl : pr; else if ((q = pr.findTreeNode(h, k, kc)) != null) return q; else p = pl; } while (p != null); } return null; } } 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
构造方法
这部分和HashMap一致,我就贴贴源码了,具体看我解析java8 HashMap源码的文章😂
· 无参构造方法
/** * Creates a new, empty map with the default initial table size (16). */ public ConcurrentHashMap() { } 12345
· 有参构造方法1
与HashMap不同的一点是,将初始数组长度赋给sizeCtl
/** * Creates a new, empty map with an initial table size * accommodating the specified number of elements without the need * to dynamically resize. * * @param initialCapacity The implementation performs internal * sizing to accommodate this many elements. * @throws IllegalArgumentException if the initial capacity of * elements is negative */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } 123456789101112131415161718
tableSizeFor方法说明
· 有参构造方法2
/** * Creates a new map with the same mappings as the given map. * * @param m the map */ public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } 123456789
· tableSizeFor
与HashMap一样的原理,详见解析java8 HashMap源码的文章
/** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 */ private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } 12345678910111213
put方法
重磅级,真的很复杂,光这个一个方法和调用的方法我都整了好久。
put(key,value)
/** * Maps the specified key to the specified value in this table. * Neither the key nor the value can be null. * * <p>The value can be retrieved by calling the {@code get} method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with {@code key}, or * {@code null} if there was no mapping for {@code key} * @throws NullPointerException if the specified key or value is null */ public V put(K key, V value) { return putVal(key, value, false); } 12345678910111213141516
initTable()
用到的全局共享变量sizeCtl。 功能:初始化数组,和sizeCtl变量为触发扩容阈值
/** * Initializes table, using the size recorded in sizeCtl. */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //CAS操作标志,首先加上while循环,当数组尚未被初始化时,会一直尝试去初始化数组 while ((tab = table) == null || tab.length == 0) { /**sizeCtl<0,两种可能,1)-1:初始化中;2)<-1:扩容中 * 由上一步while循环的判断条件可知,显然是有其他线程在初始化中。 * 所以当前线程让步,等待另外一条线程执行初始化完成。 */ if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin /**比较主存中sizeCtl是否还是上一步获取的值,是则说明当前没有其他线程正在初 * 始化中,修改其值为-1; * 否,则说明已有其他线程在初始化中,继续循环 */ else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //再次检查是否有其他线程初始化完成 if ((tab = table) == null || tab.length == 0) { /**在未初始化之前,sizeCtl>0还有种情况,即调用了有参构造方法1时 * 初始化为ConcurrentHashMap的初始大小 */ int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { /** * 若try代码块执行成功,是修改共享变量值为触发扩容阈值; * 若try代码块执行失败,是将sizeCtl的值还原 **/ sizeCtl = sc; } break; } } return tab; } 123456789101112131415161718192021222324252627282930313233343536373839404142
casTabAt(tab,i,c,v)
使用Unsafe,CAS修改tab[i]值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } 1234
tabAt(tab,i)
使用Unsafe,获取主存中tab[i]的值
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } 123
putVal(key,value,onlyIfAbsent)
/** Implementation for put and putIfAbsent * onlyIfAbsent false-代表如果key键存在,并且已有相应的vlaue值,覆盖旧值 * true-代表不覆盖旧值 */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //计算hash值 int hash = spread(key.hashCode()); // int binCount = 0; //最外层一个循环,CAS必备 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //若数组尚未初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //hash寻址到的i位置为null else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //CAS操作,若tab[i]==null,则在该位置添加新节点 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) //添加成功,跳出循环 break; // no lock when adding to empty bin } //当前链表被标记已移动,说明当前数组正在扩容中 else if ((fh = f.hash) == MOVED) //当前线程协助扩容,helpTransfer方法解析下面解析扩容时详细讲 tab = helpTransfer(tab, f); //tab[i]处存在链表,且不处于扩容中 else { V oldVal = null; //对该处链表头节点加锁,往链表中添加元素还是要保证原子性的 synchronized (f) { /**再次检查,tab[i]的位置是否还是f头节点,因为有可能其他线程在加锁 * 之后,判断之前有其他线程扩容操作,导致tab[i]的链表产生变化 */ if (tabAt(tab, i) == f) { //节点hash值大于0,直线型链表 if (fh >= 0) { //链表节点个数 binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //若该节点的hash、key、value值与即将要插入的key-value相同 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; //覆盖 if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //如果已遍历到了链表尾部,直接讲新节点插入链表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //如果是一棵树 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //若链表节点个数大于TREEIFY_THRESHOLD,讲链表变成红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } /** * concurrentHashMap的元素个数加1 * binCount在addCount()中是个标识,在线程竞争不激烈的时候,binCount>0都得检查是否 * 需要扩容;线程竞争紧张的时候,大于1才检查是否需要扩容 */ addCount(1L, binCount); return null; } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
这个方法里面的spread(key.hashCode()) 方法和HashMap方式一毛一样,就不介绍了 这个方法主要做的事有: · 计算key的hash值
· 一个外层循环,CAS必备,方便失败后重新来过( ̄▽ ̄)*
· 在循环体内执行
· 如果数组还未初始化,调用initTable方法,然后继续下一轮循环
· 如果使用hash与长度运算得到下标i处位置为 null(tabAt方 法),使用CAS操作(casTabAt方 法)将新节点插入tab[i]的位置,如果CAS操作失败继续下一轮循环;成功则跳出循环。
· 若tab[i]处链表正在因扩容而处理移位中,则当前线程进入协助扩容的方法中 (helpTransfer())
· 剩下的情况,就是数组已初始化,并且也没有处在扩容中,并且tab[i]位置已有链表/树; 那么接下来的操作自然就是将新节点插入链表/树中了
· 首先链表头节点/树根节点 加上线程锁,毕竟tab[]是一个共享变量,虽然使用了volatile修 饰,但是只能保证其立刻的可见性,但无法保证修改的原子性(详见线程内存模型),所以 需要加上线程锁. · 判断tab[i]处节点的hash值(详见节点特殊hash值含义),如果是链表型直接由头往后找, 如果是一棵树,则调用插入红黑树的方法 · 判断链表型是否需要转换成一颗红黑树 · 调用addCount方法
关于代码中U.CompareAndSwap*开头的方法具体作用看unsafe类相关方法
concurrentHashMap计数器
这个必须拿出来单独说,我光研究都研究了半天,ConcurrentHashMap的计数器实现的原理和LongAdder一样,它的计数器是用来在多线程的情况下统计元素个数。
计数器大体思路 : 它包含两个东西:baseCount ,countCell[] · 一开始线程竞争不激烈的时候,使用CAS操作baseCount 增加数值; · 当CAS操作失败后,说明有线程在竞争,那么线程就会开始操作countCell[] ; · 线程产生一个随机数r,与数组长度计算 r&(length-1) 得出下标i ,对countCell[i] 进行增减操作。原来是多个线程操作一个baseCount 的压力,分摊到countCell[] 数组中,分而治之! · 当想获取总数时,只需要将baseCount和countCell[]所有的数值相加即可==> b a s e C o u n t + ∑ i = 0 n c o u n t C e l l [ i ] baseCount + \sum_{i=0}^{n}countCell[i]baseCount+∑i=0ncountCell[i] 。
addCount(x,check) 方法
功能 :给ConcurrentHashMap元素个数计数。 看英文注释的意思是 :计数,如果数量达到阈值,将会触发扩容。如果已经有线程在扩容中,则当前线程也会加入帮助扩容。扩容完成后,再次统计计数值,原因是因为有些在putVal方法那一步发现有线程在扩容,直接进入helpTransfer方法帮助扩容,直到完成后才继续put元素。 参数 : long x => 新添加元素的个数
int check => 如果< 0,不检查是否需要扩容。如果<=1,在线程竞争不激烈的时候检测是否需要扩容。
实际上我看代码的逻辑是如果线程竞争激烈,对countCell[]计数完后,直接return,不判断check,不做扩容检测处理。 为什么呢?嗯,激发了我的好奇。 原因: 由putVal可知,check=1代表tab[i]处链表只有一个节点;check=2代表tab[i]处可能是一颗红黑树也有可能是一个有两个节点的链表;check>1代表tab[i]处有多个节点的直链表。 如果线程进入到了fullAddCount方法,说明线程竞争特别激烈了(高并发)。大量程都在计数这处循环,如果这时计数完成后还检测扩容,高并发又会延续到扩容处,程序持续压力大。众所周知,高并发并不是好事,占用大量资源,我们要尽可能避开它。所以计数完成后直接return,尽快结束高并发。等到高并发结束后的第一个线程来检测是否需要扩容。
ThreadLocalRandom
作用:在多线程环境下生成随机数。相较于Random多个线程争夺一个seed种子,ThreadLocalRandom每条线程独占一个seed种子,从而大幅提高性能。
ThreadLocalRandom.getProbe()
作用获取线程Thread的探测值,如果ThreadLocalRandom没有执行localInit方法,线程的探测值为0(即当前线程还没生成过随机数)
关于代码中U.CompareAndSwap*开头的方法具体作用看unsafe类相关方法
/** * Adds to count, and if table is too small and not already * resizing, initiates transfer. If already resizing, helps * perform transfer if work is available. Rechecks occupancy * after a transfer to see if another resize is already needed * because resizings are lagging additions. * * @param x the count to add * @param check if <0, don't check resize, if <= 1 only check if uncontended */ private final void addCount(long x, int check) { CounterCell[] as; long b, s; //若CountCells[](计数器数组不为null)或者CAS修改baseCount失败 👉 说明当前有并发 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; //无多个线程竞用标志 boolean uncontended = true; /** * 计数器数组尚未初始化 或 根据当前线程的探测值与数组长度运算得到的计数器数组卡槽为null * 或者数组下标i处不为null,但CAS修改as[i]失败。进入fullAddCount方法 */ if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); //能进入到这里说明线程较多程序繁忙,不进行扩容检测 return; } /**到这一步说明程序还是有些繁忙,毕竟CAS修改baseCount * 是失败的。check的含义由putVal的方法可知,check=1代表 *tab[]数组中添加了一个节点数只有1的链表,说明没有产生 *碰撞,并且当前存在并发,所以这种情况,暂时不检测扩容 * */ if (check <= 1) return; /** * 统计总数 */ s = sumCount(); } /** * 到这步,说明当前没有并发。程序不繁忙,可以进行扩容检测。 */ if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //当前数据个数达到阈值,并且数组已初始化,并且数组长度没有达到最大长度 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { /** * 利用 Integer.numberOfLeadingZeros(n)|1<<15 生成一个第16位是1的数值。 * 比如 0000 0000 0000 0000 1000 0000 0010 1101 */ int rs = resizeStamp(n); //当前concurrentHashMap已经在扩容中 if (sc < 0) { /** * RESIZE_STAMP_SHIFT=16 * (sc >>> RESIZE_STAMP_SHIFT)!=rs: true-扩容已结束 * sc==rs+1、sc==rs+ MAX_RESIZERS : 这个两个我是想不通了,我个人认为这个不正确 *ㄟ( ▔, ▔ )ㄏ,我觉得应该是sc==sc+1,sc=sc+MAX_RESIZERS才是 * (nt=nextTable)==null、transferIndex<=0: 扩容结束标志 */ if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //扩容尚未结束,当前线程加入扩容中,sc=sc+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } /** * RESIZE_STAMP_SHIFT =16 *众所周知,int数据类型占4字节32位二进制,且在计算机最高位标识符号位,1-负数、0-正数。 *第一条线程扩容时,将sizeCtl修改个高16位生成一个标记戳,并且最高是1(即负数),因为rs第16位是1。也刚好满足:在 * 扩容时,sizeCtl是一个负数。 */ else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); /**第一条线程开启扩容在结束之前,后面会陆陆续续有 *其他线程进入扩容,也包括在putVal方法中还没插入元 *素之前进入扩容的。所以需要重新统计个数 */ s = sumCount(); } } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
fullAddCount(long x,boolean wasUncontended)
当处于高并发情况时,大量线程对baseCount变量进行修改,造成性能损耗,并且CAS修改的次数会比较多,毕竟每次只有一个能成功,失败了要循环一次再次尝试CAS修改,循环消耗也大。对于这种情况采用分而治之的办法,多个线程修改多个变量,各改各的。每个线程随机选择countCell[]的一个槽位对其进行增加。
关于代码中U.CompareAndSwap*开头的方法具体作用看unsafe类相关方法
// See LongAdder version for explanation private final void fullAddCount(long x, boolean wasUncontended) { int h; //若当前线程还没有被ThreadLocalRandom初始化 if ((h = ThreadLocalRandom.getProbe()) == 0) { //初始化当前线程的probe的探测值和seed种子 ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } //如果最后一个槽为非空则为true boolean collide = false; // True if last slot nonempty //自旋 for (;;) { CounterCell[] as; CounterCell a; int n; long v; //若计数器数组已初始化 if ((as = counterCells) != null && (n = as.length) > 0) { /**使用当前线程的探测值寻址到下标i,若数组下标i处数值为null */ if ((a = as[(n - 1) & h]) == null) { /** * cellBusy=1:代表countCell[]正在添加计数单元或者在扩容 *cellBusy=0:其他 */ if (cellsBusy == 0) { // Try to attach new Cell //尝试添加新计数单元 CounterCell r = new CounterCell(x); // Optimistic create //CAS方式修改cellBusy的值为1 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //已添加好标志 boolean created = false; try { // Recheck under lock //再次检测数组是都已初始化(double check) CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } //已添加好新计数单元,跳出自旋体 if (created) break; //若添加失败,继续自旋尝试修改 continue; // Slot is now non-empty } } collide = false; } /**由addCount方法我们知道,当wasUncontended=false * 时,代表使用原来的探测值计算得到卡槽,对其CAS修改失 * 败。所以这就代表这该处卡槽有其他线程在竞争,则修改 *当前线程的探测值,重新找一处卡槽 */ else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //这种情况代表寻找到的卡槽已有值,目前还没有竞争,在原来的基础上叠加值 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; /** *上一个else if CAS修改计数单元失败 *而数组已扩容或已达到数组长度最大值.NCPU是系统逻辑 * 处理器的个数,一个逻辑处理器一个线程,假设NCPU=8 *。那么countCell[]数组最大长度maxLen>=8,代表同一 *时刻,只会由8条线程成功修改数组里的计数单元。 */ else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale /** * 走到这一步,说明上一个else if条件没满足(数组可以扩容),标记collide=true,便 *于下一次循环进入扩容的else if中 */ else if (!collide) collide = true; /** * 没有其他线程在添加新计数单元或扩容。标记cellsBusy */ else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { /**再次检测计数器数组是否已扩容,当前线程 *工作内存中的countCell[]是否已过时 */ if (counterCells == as) {// Expand table unless stale //扩容,并复制数组元素 CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; //无论扩容是否成功,进入下一次循环再次尝试CAS修改 continue; // Retry with expanded table } //重置当前线程的探测值,重新寻找卡槽 h = ThreadLocalRandom.advanceProbe(h); } //数组还没有初始化,并且没有其他线程在初始化 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table //多线程情况下基本操作double check if (counterCells == as) { //初始长度为2 CounterCell[] rs = new CounterCell[2]; //将当前线程的计数放入数组中 rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } /** * 修改cellBusy失败,说明有其他线程在对计数器数组进行 * 操作,说明压力到计数器这里来了,那么代表着baseCount * 修改的压力变小。再次尝试对baseCount进行修改 */ else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
sumCount()
功能:计算ConcurrentHashMap的元素个数
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } 1234567891011
扩容
concurrentHashMap的重点和难点就在这里,如何控制多线程下的并发扩容?
helpTransfer
功能:如果当前这个ConcurrentHashMap实例正在扩容中,当前线程进入扩容方法,协助其他线程扩容。
/** * Helps transfer if a resize is in progress. */ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //若是正在进行扩容,当前线程加入一起扩容 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { /** * 接下来的内容和前面解析addCount()方法一样 */ int rs = resizeStamp(tab.length); //判断当前是否正处在扩容中 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //判断扩容是否已结束 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //sc加1,当前线程进入扩容方法协助扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } //扩容结束后返回扩容后数组引用 return nextTab; } //如果线程进入该方法的时候就已完成扩容,直接返回当前使用数组对象 return table; } 12345678910111213141516171819202122232425262728293031
transfer
接下来,重点呐。
在我刚开始解析ConcurrentHashMap扩容源码的时候,我一直疑问,在扩容的时候,如何保证get()方法获取数据的准确性。后面终于它的处理方式就是: · 当前数组对象tab指向的地址引用不变。 · 持有一个专门在扩容时使用的nextTable引用指向新数组在内存中的地址。 · 成功将某个槽位的数据移动到新数组nextTable之后,在当前使用的数组tab的那个槽位中插入一个桥梁,该桥梁通向新数组。 · 所以当线程需要读取当前使用数组tab该槽位的数据时,会获取到该桥梁,经由桥梁到达新数组,在新数组中查找。
该桥梁就是👉 ForwardingNode类
大致处理思路:
· 扩容要处理的下标范围在 0 ~ (tab[].length-1)之间(即旧数组的下标范围)
· 每个线程进入transfer方法,首先计算要处理的下标区间段。比如旧数组长度是28,那么要处理的下标总区间是[0,28-1]。再根据设备的 逻辑处理器核数(与能并行的线程数密切相关) 计算出每个线程要处理槽的个数,假设某设备逻辑处理器个数为8,得出线程要处理的槽的个数为16。
具体多线程实例场景如下↓
· 假设第一个线程a进入扩容方法,首先要计算它要处理的卡槽的个数为16,区间段为[240,255],并标记还未处理的个数为256-16=240个,则剩下未处理的总区间是[0,239]。然后开始处理这个区间的数据,处理完后检测发现如果有其他线程也在扩容,直接结束它的扩容之旅。 · 第二个线程b进入扩容方法,计算得出要处理的卡槽数为16,区间段[224,239],并标记剩余未处理个数为224个,则剩余未处理的总区间为[0,223]。开始处理这个区间。处理完后检测发现如果有其他线程也在扩容,直接结束它的扩容之旅。 · 每个线程进入扩容都如此,直到剩余未处理的个数为0。
如果是单线程的情况如下↓
· 线程a进入扩容,首先得到当前剩余未处理个数为28,总区间为[0,28-1]。要处理卡槽个数为16,要处理区间为[240,255],标记当前剩余未处理个数为240。然后开始处理该部分区间。 · 当循环处理完[240,255]后,检测发现当前剩余未处理个数还是240个,说明当前并没有其他线程参与扩容。那么当前线程a继续计算下一个要处理得区间为[224,239],标记剩余未处理个数为224个。接着往下处理这个区间。 · a线程结束扩容的触发点是有两个 ①当它每一次处理完一个区间后,发现之前标记的剩余未处理的个数和现在从主存中获取的值不一样,直接结束,相当于交接给另外一个线程。②当剩余未处理个数为0的时候,即所有数据都完成扩容迁移的时候,结束。
/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { /** * n:旧数组长度,stride线程要处理的卡槽的个数 */ int n = tab.length, stride; /** *计算要处理卡槽的个数 * NCPU:逻辑处理器个数(一个逻辑处理器任一时刻只能支持一条线程运行) * MIN_TRANSFER_STRIDE:一个线程最小处理卡槽的个数 16 */ if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //nextTab参数为null说明扩容才刚刚开始,初始化一些扩容相关的全局共享变量 if (nextTab == null) { // initiating try { //数组长度扩大2倍 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME //数组扩大2倍后,内存溢出。所以后续是不能再扩容了,直接将阈值设置为Integer类型中的最大值,这样后续是很难触发扩容的。 sizeCtl = Integer.MAX_VALUE; return; } //将全局变量nextTable指向新的数组对象 nextTable = nextTab; //可以理解为剩余未处理卡槽个数 transferIndex = n; } int nextn = nextTab.length; //扩容时期旧数组到新数组的桥梁 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; //整个扩容过程是否完成 boolean finishing = false; // to ensure sweep before committing nextTab //自旋 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //计算处理下标区间,并标记剩余未处理槽位个数 while (advance) { //nextBound为区间的最小值,nextIndex-1为区间的最大值 int nextIndex, nextBound; //--i>=bound 代表区间的数据已处理完,finishing=true代表整体扩容已完成 if (--i >= bound || finishing) advance = false; //transferIndex<=0代表所有槽位都已处理或在处理中 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //修改transferIndex,可以理解为修改尚未处理的槽位的个数为原来未处理的个数减去当前线程要处理的个数 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } /** *处理下标等于或超出整体处理区间[0,n]的边界 *三种情况: *①处理区间[0,stride-1]的那条线程,处理完成。但有可能整体的扩容进程还没完成 *②由上一个while可知,当所有槽位都有线程在处理中时,将i设置为-1 *③在该循环体内,第二个if体内的再次检测,在多线程开发中必不可少的double-check */ if (i < 0 || i >= n || i + n >= nextn) { int sc; //如果这时所有的线程已处理完扩容 if (finishing) { //将全局共享变量nextTable置为null,这样其他程可以依据这个判断扩容是否已完成 nextTable = null; //新数组正式投入使用 table = nextTab; //将sizeCtl设置为下次触发扩容阈值 sizeCtl = (n << 1) - (n >>> 1); return; } //sc=sc-1,代表当前线程处理完相应的它负责的区间 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { /** *由addCount方法的解析可知,第一条线程开始扩容时,会将 *sc设置为(resizeStamp(n)<<RESIZE_STAMP_SHIFT)+2. *后面加入扩容的线程,都会将sc=sc+1. *当sc-2==resizeStamp(n)<<RESIZE_STAMP_SHIFT时,代表 *除了当前线程外,其他所有的线程都已完成对应区间的扩容。 *反之,还有些线程没完成扩容处理。 */ if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; /** *走到一步,说明所有线程都完成扩容处理,但是保险起见,需要再次检测一次 */ finishing = advance = true; i = n; // recheck before commit } } /** *tab[i](旧数组i槽位)没有数据,直接放一个Forwarding类型数据(即一个指向新数组的桥梁),然后下一次循环,处理下一个槽位 */ else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); /** *该槽位的数据已被处理过,设置advance=true,即下一次循环进入 *计算处理区间代码判断是否需要继续处理下一个区间 */ else if ((fh = f.hash) == MOVED) advance = true; // already processed /** * 该槽位的数据尚未被动过 */ else { /** *移动拆分链表/树 数据是一个写/修改公共变量的内存的操作, *并且步骤有多步,由前面讲的CAS的知识可知,需要保证其原 *子性,防止ABA问题的产生 */ synchronized (f) { //多线程环境下必备的double-check,确保该槽位的数据还是原来的那个数据 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; //直线型链表 /** * 这部分代码解析呢,建议去看本人对hashMap的解析, *特别详细,刚入门的都能看懂 */ if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //在旧数组i槽位,放入一个桥梁 setTabAt(tab, i, fwd); /* *即下一次循环进入计算处理区间代码判断是否需要继续处理下一个区间 */ advance = true; } //红黑树建议看本人红黑树原理解析,大致就明白了 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //判断拆分后的红黑树是否需要拆解转换成直链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210