a、JDK1.7.80源码
ConcurrentHashMap基本结构图(其实就是把HashMap进行切分通过Segment进行管理)
1、Segment结构及参数及方法
-
基本类结构(继承了ReentrantLock,可以基于ReentranLock对Segment进行相应的并发控制)
static final class Segment<K,V> extends ReentrantLock implements Serializable {
-
Segment基本参数
// 获取锁的最大次数,多核处理器为64,单核处理器为1 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; // Segment对象的HashEntry数组 transient volatile HashEntry<K,V>[] table; // Segment对象中节点的数量,用来算size与判断isEmpty transient int count; // 操作Segment对象的次数,统计size与判断isEmpty transient int modCount; // 下一次要扩容的阀值(该值=capacity * load factor) transient int threshold; // table的加载因子 final float loadFactor;
-
HashEntry基本信息
基本参数(与HashMap的Entry节点一致)
final int hash; final K key; volatile V value; volatile HashEntry<K,V> next;
-
Segment构造器
Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf;//设置加载因子 this.threshold = threshold;//设置扩容阀值 this.table = tab;//表数组设置 }
-
Segment的put方法
// 可能多个线程同时调用一个Segment对象的put方法 final V put(K key, int hash, V value, boolean onlyIfAbsent) { /** * tryLock():非阻塞,尝试获取锁 * lock():阻塞加锁 * 下面操作加不上锁会导致CPU爆满,优点可以在循环内做相应业务逻辑 * while(!tryLock()) { * // 业务逻辑 * } */ HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; // 定位到Segment下面数组的下标 int index = (tab.length - 1) & hash; // 获取数组index下标节点(首节点) HashEntry<K,V> first = entryAt(tab, index); // 遍历该节点下链表 for (HashEntry<K,V> e = first;;) { if (e != null) { // 链表不为空 K k; // hash值相等且equals相等 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; // 统计操作集合次数,fair-fast机制 } break; } e = e.next; //遍历 } else { if (node != null) // 头插法 node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); // 超过扩容阀值进行数组扩容 else // 使用UNSAFE.putOrderedObject方法插入 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // ReentranLock必须手动释放锁 unlock(); } return oldValue; } // 获取数组指定下标节点 static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { return (tab == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); }
-
获取锁的方法:scanAndLockForPut
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node // 尝试加锁的过程可以完成一些准备工作 while (!tryLock()) { // scanAndLock方法流程一致 HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { // 当前table数组当前下标未有节点或遍历完了 if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) // 重复key节点 retries = 0; else e = e.next; // 依次遍历链表 } else if (++retries > MAX_SCAN_RETRIES) { lock(); // 阻塞加锁 break; // 获取成功 } // (retries & 1) == 0偶数才会成立,1.7采用头插法,观察首节点有没有被修改 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; // 重新遍历链表 } } return node; } // 根据segment对象和hash值得到segment对象下对应的table数组对应的节点(首节点) static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) { HashEntry<K,V>[] tab; return (seg == null || (tab = seg.table) == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); }
-
scanAndLock方法与上述方法中间的步骤其实一致的
private void scanAndLock(Object key, int hash) { // similar to but simpler than scanAndLockForPut HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; int retries = -1; while (!tryLock()) { HashEntry<K,V> f; if (retries < 0) { if (e == null || key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; retries = -1; } } }
-
数组扩容方法:rehash
private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; // 按倍扩容 int newCapacity = oldCapacity << 1; // 计算下一扩容阀值 threshold = (int)(newCapacity * loadFactor); // 创建扩容后数组 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; // 遍历老数组 for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; // 计算新数组下标位置 int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; // 直接将老数组单个节点放入新数组对应下标即可 else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; // idx记录该链表首节点对应在新数组的下标位置 for (HashEntry<K,V> last = next; last != null; last = last.next) { // 计算当前节点在新数组的下标值(要么一样,要么在+oldCapacity位置) int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; // 实时记录不同槽位情况的下标 lastRun = last; // 实时记录最后一个不同槽位的节点 } } // lastRun~链表最后连续的节点都转移到新数组对应下标 newTable[lastIdx] = lastRun; // 遍历首节点~lastRun节点,将剩余依次转移到新数组对应下标 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; // 重新计算下标 HashEntry<K,V> n = newTable[k]; // 头插法 newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 新老数组转移结束 // 新节点新数组下标 int nodeIndex = node.hash & sizeMask; // add the new node、 // 头插法 node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; //指针指向新表 }
-
remove方法
final V remove(Object key, int hash, Object value) { if (!tryLock()) // 获取锁操作 scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; // 遍历链表,找到对应要remove的节点 while (e != null) { K k; HashEntry<K,V> next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); else // 直接指向下一个 pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; }
-
replace方法
final V replace(K key, int hash, V value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V> e; for (e = entryForHash(this, hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; e.value = value;//只进行值的覆盖 ++modCount; break; } } } finally { unlock(); } // 返回老值 return oldValue; }
-
clear方法
final void clear() { lock(); try { HashEntry<K,V>[] tab = table; // 遍历数组 for (int i = 0; i < tab.length ; i++) setEntryAt(tab, i, null);//直接用null节点覆盖 ++modCount; count = 0; } finally { unlock(); } }
2、集合的基本参数
// 整个集合中节点总数的默认值
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 数组的扩容因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 支持的并发数量,其实就是Segment数组的大小
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 节点总数最大值
static final int MAXIMUM_CAPACITY = 1 << 30;
// Segment对象最小的节点数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 最大的Segment对象数量,二进制16位
static final int MAX_SEGMENTS = 1 << 16;
/**
* 默认自旋次数,超过这个次数直接加锁,防止在size方法中由于不停有线程在更新map,导致无限的进行自旋影响性能。
* 只在size()/containsValue()方法使用
*/
static final int RETRIES_BEFORE_LOCK = 2;
// Segment对象掩码值,也就是Segment数组长度-1,用来与运算hash算法确定Segment数组下标
final int segmentMask;
/**
* 用于定位参与hash运算的位数,this.segmentShift = 32 - sshift;(构造方法中)
* 这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的
* sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次
* 所以sshift等于4
*/
final int segmentShift;
// Segment对象数组
final Segment<K,V>[] segments;
// 其他
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;
3、构造器方法
// 默认构造
public ConcurrentHashMap() {
// 16, 0.75, 16
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
// 提供初始大小构造
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
// 两个参数构造
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
// 终极构造,initialCapacity为整个ConcurrentHashMap中所有数组总共的Entry节点个数
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
// 基本信息校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
// 下面找到>=concurrencyLevel最小2次幂方数
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// segmentShift是用于定位参与hash运算的位数
this.segmentShift = 32 - sshift;
/**
* segmentMask是哈希运算的掩码,长度-1参与
* 为啥这里构造方法中就确定这个全局的segment掩码值,而HashMap中会在put时使用数组长度-1动态获取
* 因为一旦Segment对象长度指定之后,ConcurrentHashMap后续不会对Segment进行扩容,并发度不会改变
*/
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 初始节点总数量/segment对象数量=每个Segment对象下Entry数量
int c = initialCapacity / ssize;
/**
* 为啥先除再乘ssize呢?这里是向上取整
* 如initialCapacity=12时,ssize=8,c其实为1.5,但12/8=1,下述是向上取整操作
* 目的:为了保证最后创建出来的Entry数必须大于等于你想要创建的initialCapacity数量
*/
if (c * ssize < initialCapacity)
++c;//向上取整
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 确保cap为>=c的2的次方数
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
// ssize一定是2幂次方数,为了保证按位与的hash算法能均匀分布在[0,ssize]上
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 通过直接操作内存将s0对象放在Segment数组第0个位置
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
4、put方法
public V put(K key, V value) {
Segment<K,V> s;
// 不允许key为空
if (value == null)
throw new NullPointerException();
// 计算key的hash值
int hash = hash(key);
// 计算Segment数组的下标
int j = (hash >>> segmentShift) & segmentMask;
// 判断Segment的j位置是否为空,基于UNSAFE下的线程安全操作
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
/**
* Segment的j位置为空时进行Segment对象的创建
* 这里不在乎是哪个线程创建的,多线程下能创建成功即可
*/
s = ensureSegment(j);
// 调用Segment的put方法,与HashMap的put差不多,就保证了线程安全
return s.put(key, hash, value, false);
}
构建Segment对象方法:ensureSegment
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
// 获取Segment数组的偏移量
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
// 再次判断Segment数组在u位置是否为空
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 下面直接使用Segment数组的Segment[0]参数进行Segment[k]的构建
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 重复检查是否为空
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 使用CSA操作(无锁操作),将创建出来的Segment对象s放入到ss数组的u位置上去
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) //失败返回false,继续while循环
break;//成功直接返回
}
}
}
return seg;
}
5、get方法
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
// 通过hash值算出Segment数组的下标
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 遍历Segment对象的下Entry数组
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
// key的hash值相等且equals方法相等才返回
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
6、其他常用方法
size方法:
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
// 给所有Segment对象加锁
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
// 遍历Segment数组
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
// size为所有Segment对象的count和
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
// 给所有Segment对象释放锁
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
isEmpty方法:
public boolean isEmpty() {
long sum = 0L;
final Segment<K,V>[] segments = this.segments;
// 遍历Segment数组
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false; // 存在节点就返回false
sum += seg.modCount;
}
}
// 若Segment对象里节点的修改次数不为0(有修改),再重新检查一遍
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}
if (sum != 0L)
return false;
}
return true;
}
b、JDK1.8.0.231源码
ConcurrentHashMap基本结构图(数组+单向链表+双向链表+红黑树)
1、集合结构及基本参数
-
基本类结构
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
-
集合基本参数
// 数组最大长度 private static final int MAXIMUM_CAPACITY = 1 << 30; // 数组初始大小 private static final int DEFAULT_CAPACITY = 16; // 使用toArray方法最大数组的大小限制(超过会抛OOM异常) static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 链表转化为红黑树的阀值 static final int TREEIFY_THRESHOLD = 8; // 由红黑树还原为链表的阀值(为啥不是8呢?防止在临界值8的时候进行节点的增删操作,导致频繁转化,影响效率) static final int UNTREEIFY_THRESHOLD = 6; /** * 最小树形化容量阈值 * (当哈希表中的容量 > 该值时,才允许树形化链表,否则,若桶内元素太多时,则直接扩容,而不是树形化。 * 为了避免进行 * 扩容、树形化选择的冲突,这个值不能小于 4 * TREEIFY_THRESHOLD) */ static final int MIN_TREEIFY_CAPACITY = 64; // 默认最小步长 private static final int MIN_TRANSFER_STRIDE = 16; // sizeCtl的stamp bit private static int RESIZE_STAMP_BITS = 16; // 节点的几种状态 static final int MOVED = -1; static final int TREEBIN = -2; static final int RESERVED = -3; static final int HASH_BITS = 0x7fffffff; // 虚拟处理器数量(并不是实际CPU核心数量) static final int NCPU = Runtime.getRuntime().availableProcessors(); // 数组 transient volatile Node<K,V>[] table; // 扩容时下一个表 private transient volatile Node<K,V>[] nextTable; // 基本计数变量 private transient volatile long baseCount; // 数组初始化及扩容的阀值,初始值为0 private transient volatile int sizeCtl; // 转移元素时下一个坐标 private transient volatile int transferIndex; // 其他线程是否在操作counterCells数组,初始值0-未操作 private transient volatile int cellsBusy; // 统计操作数组-用来统计集合size private transient volatile CounterCell[] counterCells;
2、节点的结构:Node与TreeNode
Node节点基本结构
-
基本参数
final int hash; final K key; volatile V val; volatile Node<K,V> next;
-
基本方法
public final int hashCode() { return key.hashCode() ^ val.hashCode(); } // key、value都不为空且值与equals方法相等 public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); }
3、构造器方法
// 默认构造
public ConcurrentHashMap() {
}
// 给定初始化大小构造器
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
// 使用现成集合构造
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
// 给定初始化数组大小与加载因子构造器
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
// 最终调用构造
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor); // 计算扩容阀值
int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap; // 扩容阀值
}
4、put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
// 实际调用
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 直接控制key和value不为空,1.7只控了value,但key为空时会在逻辑报错
if (key == null || value == null) throw new NullPointerException();
// 计算hash值
int hash = spread(key.hashCode());
int binCount = 0; // 统计链表节点个数
// 循环数组
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化数组
// 计算key的数组下标:(n - 1) & hash, f为数组下标首节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 数组下标无节点
// CAS操作将新节点放入该位置
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // CAS成功终止循环,失败继续循环
}
else if ((fh = f.hash) == MOVED)
// 其他线程正在进行扩容操作,这里也需要辅助扩容保证线程安全
tab = helpTransfer(tab, f);
else { // 对链表进行遍历/操作
V oldVal = null;
synchronized (f) { // 对数组下标首节点进行加锁
if (tabAt(tab, i) == f) { // 检查当前数组下标首节点是否为f
if (fh >= 0) {
// 用来记录链表长度,来决定是否转化为红黑树
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break; // 若key节点存在,直接新值覆盖老值并返回老值
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 一直遍历到尾结点
// 尾插法
pred.next = new Node<K,V>(hash, key, value, null);
break; // 插入完成,终止循环
}
}
}
else if (f instanceof TreeBin) { // 为红黑树结构
Node<K,V> p;
binCount = 2;
// 进行红黑树节点插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
} // synchronized结束了,下面为啥不在锁里面进行呢?为了使treeifyBin方法更通用吧,在treeifyBin方法中通用也会对首节点加锁
if (binCount != 0) {
// 若链表长度达到链表转化红黑树阀值,进行转化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 链表转化为红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
// 统计Node节点个数与并发扩容操作
addCount(1L, binCount);
return null;
}
初始化数组方法:initTable
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
/**
* yield是先放弃CPU资源,然后会再与其他线程竞争CPU资源
* 极端情况下:A线程让出后马上能竞争到CPU资源,也会引起CPU100%
*/
Thread.yield(); // lost initialization race; just spin
// CAS操作让sc-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 数组为空时才进行初始化
if ((tab = table) == null || tab.length == 0) {
// 默认大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 创建新数组
table = tab = nt;
// n-1/4n=0.75n(下一扩容阈值)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
多线程下辅助转移节点:helpTransfer
// 如果其他线程正在转移节点中,新线程通过该方法进行并发辅助转移
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 每个线程进行辅助转移时,会将SIZECTL加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 新老数组进行节点转移
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
转移节点方法:transfer
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算步长
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // 新数组为空,进行初始化
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // n << 1成倍扩容
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 节点正在转移时的标记节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) { // bound为步长
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// --i >= bound条件包含两个信息:1、--i表明是从后往前循环 2、>=bound是转移[bound, i]步长范围内的节点
if (--i >= bound || finishing)
advance = false; // 转移完成了,停止转移
// 将nextIndex设置为transferIndex的长度,transferIndex在下面会通过CAS操作更新的
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
/**
* compareAndSwapInt(Object o, long offset, int expected, int x);
* 读取传入对象o在内存中偏移量为offset位置的值与期望值expected作比较。
* 相等就把x值赋值给offset位置的值。方法返回true。不相等,就取消赋值,方法返回false。
* 下述操作会将TRANSFERINDEX偏移量的值transferIndex改为nextBound
*/
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
// 计算下一个步长的数组下标nextBound
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
/**
* 随着bound与i的动态计算,可以确定每次移动的节点范围是按照步长来的
* 老数组具体移动范围:[bound, i]
*/
bound = nextBound; // 数组步长下标
i = nextIndex - 1; // 结束下标
advance = false; // 终止循环
}
}
// n为老数组的长度,nextn新数组长度
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 当前线程已完成转移了
nextTable = null;
table = nextTab;
// 计算数组扩容阀值,(n << 1) - (n >>> 1) = 新数组长度 * 0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// CAS给sizeCtl减1并且次数的sc已经赋值成sizeCtl了
/**
* CAS给sizeCtl减1并且次数的sc已经赋值成sizeCtl了
* 此时sc = sizeCtl = SIZECTL = resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2;
*/
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 这里其实就是判断现在的sc是否等于传进来初始值
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return; // 证明其他线程还在进行节点转移(helpTransfer方法进入的)
// 走到这里证明所有线程都已完成节点转移操作
finishing = advance = true; // 置为节点转移完成状态
i = n; // recheck before commit
}
}
// 若当前数组下标节点为空,直接放置一个已转移的标记节点fwd
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 当前节点正在转移中,继续其他节点转移
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else { // 正在进行节点转移
synchronized (f) { // 转移时对数组下标的首节点加锁
if (tabAt(tab, i) == f) { // 检查之前获取的节点未被改变
Node<K,V> ln, hn;
if (fh >= 0) { // 链表节点转移
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { // 红黑树节点转移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
统计节点与扩容操作:addCount
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 竞争baseCount失败后通过counterCells数组,大大降低线程竞争消耗
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// CAS操作给baseCount+1失败
CounterCell a; long v; int m;
boolean uncontended = true;
// 判断当前位置:ThreadLocalRandom.getProbe() & m
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// cas给a+1
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 给counterCells数组指定下标+1失败,在fullAddCount中继续操作
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 计算集合总节点个数
s = sumCount();
}
// 扩容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 集合的size>=扩容的阀值且数组不为空
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 数组转移方法
transfer(tab, nt);
}
// cas修改sizeCtl值
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// SIZECTL = resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2; 里面transfer有用到该值
transfer(tab, null);
s = sumCount();
}
}
}
5、get方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// 数组不为空且该下标存在节点
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
// 值相等且equals方法相等
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
6、其他常用方法
size方法:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
// 使用CounterCell统计节点个数
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
// 遍历counterCells数组并求和
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
isEmpty方法:
public boolean isEmpty() {
return sumCount() <= 0L; // ignore transient negative values
}