0
点赞
收藏
分享

微信扫一扫

ConcurrentHashMap源码详解(JDK1.7&1.8)

夕颜合欢落 2022-04-14 阅读 33
javaidea

a、JDK1.7.80源码

ConcurrentHashMap基本结构图(其实就是把HashMap进行切分通过Segment进行管理)在这里插入图片描述

1、Segment结构及参数及方法

  1. 基本类结构(继承了ReentrantLock,可以基于ReentranLock对Segment进行相应的并发控制)

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
    
  2. Segment基本参数

    // 获取锁的最大次数,多核处理器为64,单核处理器为1
    static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    // Segment对象的HashEntry数组
    transient volatile HashEntry<K,V>[] table;
    // Segment对象中节点的数量,用来算size与判断isEmpty
    transient int count;
    // 操作Segment对象的次数,统计size与判断isEmpty
    transient int modCount;
    // 下一次要扩容的阀值(该值=capacity * load factor)
    transient int threshold; 
    // table的加载因子
    final float loadFactor;
    
  3. HashEntry基本信息

    基本参数(与HashMap的Entry节点一致)

    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
    
  4. Segment构造器

    Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
        this.loadFactor = lf;//设置加载因子
        this.threshold = threshold;//设置扩容阀值
        this.table = tab;//表数组设置
    }
    
  5. Segment的put方法

    // 可能多个线程同时调用一个Segment对象的put方法
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        /**
         * tryLock():非阻塞,尝试获取锁
         * lock():阻塞加锁
         * 下面操作加不上锁会导致CPU爆满,优点可以在循环内做相应业务逻辑
         * while(!tryLock()) {
         *     // 业务逻辑
         * }
         */
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
            // 定位到Segment下面数组的下标
            int index = (tab.length - 1) & hash;
            // 获取数组index下标节点(首节点)
            HashEntry<K,V> first = entryAt(tab, index);
            // 遍历该节点下链表
            for (HashEntry<K,V> e = first;;) {
                if (e != null) { // 链表不为空
                    K k;
                    // hash值相等且equals相等
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount; // 统计操作集合次数,fair-fast机制
                        }
                        break;
                    }
                    e = e.next; //遍历
                } else {
                    if (node != null)
                        // 头插法
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node); // 超过扩容阀值进行数组扩容
                    else
                        // 使用UNSAFE.putOrderedObject方法插入
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            // ReentranLock必须手动释放锁
            unlock();
        }
        return oldValue;
    }
    
    // 获取数组指定下标节点
    static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab == null) ? null :
        (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)i << TSHIFT) + TBASE);
    }
    
  6. ​ 获取锁的方法:scanAndLockForPut

    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
        HashEntry<K,V> first = entryForHash(this, hash);
        HashEntry<K,V> e = first;
        HashEntry<K,V> node = null;
        int retries = -1; // negative while locating node
        // 尝试加锁的过程可以完成一些准备工作
        while (!tryLock()) { // scanAndLock方法流程一致
            HashEntry<K,V> f; // to recheck first below
            if (retries < 0) {
                if (e == null) { // 当前table数组当前下标未有节点或遍历完了
                    if (node == null) // speculatively create node
                        node = new HashEntry<K,V>(hash, key, value, null);
                    retries = 0;
                }
                else if (key.equals(e.key)) // 重复key节点
                    retries = 0;
                else
                    e = e.next; // 依次遍历链表
            }
            else if (++retries > MAX_SCAN_RETRIES) {
                lock(); // 阻塞加锁
                break; // 获取成功
            }
            // (retries & 1) == 0偶数才会成立,1.7采用头插法,观察首节点有没有被修改
            else if ((retries & 1) == 0 &&
                     (f = entryForHash(this, hash)) != first) {
                e = first = f; // re-traverse if entry changed
                retries = -1; // 重新遍历链表
            }
        }
        return node;
    }
    
    // 根据segment对象和hash值得到segment对象下对应的table数组对应的节点(首节点)
    static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
        HashEntry<K,V>[] tab;
        return (seg == null || (tab = seg.table) == null) ? null :
        (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
    }
    
  7. scanAndLock方法与上述方法中间的步骤其实一致的

    private void scanAndLock(Object key, int hash) {
        // similar to but simpler than scanAndLockForPut
        HashEntry<K,V> first = entryForHash(this, hash);
        HashEntry<K,V> e = first;
        int retries = -1;
        while (!tryLock()) {
            HashEntry<K,V> f;
            if (retries < 0) {
                if (e == null || key.equals(e.key))
                    retries = 0;
                else
                    e = e.next;
            }
            else if (++retries > MAX_SCAN_RETRIES) {
                lock();
                break;
            }
            else if ((retries & 1) == 0 &&
                     (f = entryForHash(this, hash)) != first) {
                e = first = f;
                retries = -1;
            }
        }
    }
    
  8. 数组扩容方法:rehash

    private void rehash(HashEntry<K,V> node) {
        HashEntry<K,V>[] oldTable = table;
        int oldCapacity = oldTable.length;
        // 按倍扩容
        int newCapacity = oldCapacity << 1;
        // 计算下一扩容阀值
        threshold = (int)(newCapacity * loadFactor);
        // 创建扩容后数组
        HashEntry<K,V>[] newTable =
            (HashEntry<K,V>[]) new HashEntry[newCapacity];
        int sizeMask = newCapacity - 1;
        // 遍历老数组
        for (int i = 0; i < oldCapacity ; i++) {
            HashEntry<K,V> e = oldTable[i];
            if (e != null) {
                HashEntry<K,V> next = e.next;
                // 计算新数组下标位置
                int idx = e.hash & sizeMask;
                if (next == null)   //  Single node on list
                    newTable[idx] = e; // 直接将老数组单个节点放入新数组对应下标即可
                else { // Reuse consecutive sequence at same slot
                    HashEntry<K,V> lastRun = e;
                    int lastIdx = idx; // idx记录该链表首节点对应在新数组的下标位置
                    for (HashEntry<K,V> last = next;
                         last != null;
                         last = last.next) {
                        // 计算当前节点在新数组的下标值(要么一样,要么在+oldCapacity位置)
                        int k = last.hash & sizeMask;
                        if (k != lastIdx) {
                            lastIdx = k; // 实时记录不同槽位情况的下标
                            lastRun = last; // 实时记录最后一个不同槽位的节点
                        }
                    }
                    // lastRun~链表最后连续的节点都转移到新数组对应下标
                    newTable[lastIdx] = lastRun;
                    // 遍历首节点~lastRun节点,将剩余依次转移到新数组对应下标
                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask; // 重新计算下标
                        HashEntry<K,V> n = newTable[k];
                        // 头插法
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        } // 新老数组转移结束
        // 新节点新数组下标
        int nodeIndex = node.hash & sizeMask; // add the new node、
        // 头插法
        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;
        table = newTable; //指针指向新表
    }
    
  9. remove方法

    final V remove(Object key, int hash, Object value) {
        if (!tryLock())
            // 获取锁操作
            scanAndLock(key, hash);
        V oldValue = null;
        try {
            HashEntry<K,V>[] tab = table;
            int index = (tab.length - 1) & hash;
            HashEntry<K,V> e = entryAt(tab, index);
            HashEntry<K,V> pred = null;
            // 遍历链表,找到对应要remove的节点
            while (e != null) {
                K k;
                HashEntry<K,V> next = e.next;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    V v = e.value;
                    if (value == null || value == v || value.equals(v)) {
                        if (pred == null)
                            setEntryAt(tab, index, next);
                        else
                            // 直接指向下一个
                            pred.setNext(next);
                        ++modCount;
                        --count;
                        oldValue = v;
                    }
                    break;
                }
                pred = e;
                e = next;
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
    
  10. replace方法

    final V replace(K key, int hash, V value) {
        if (!tryLock())
            scanAndLock(key, hash);
        V oldValue = null;
        try {
            HashEntry<K,V> e;
            for (e = entryForHash(this, hash); e != null; e = e.next) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    e.value = value;//只进行值的覆盖
                    ++modCount;
                    break;
                }
            }
        } finally {
            unlock();
        }
        // 返回老值
        return oldValue;
    }
    
  11. clear方法

    final void clear() {
        lock();
        try {
            HashEntry<K,V>[] tab = table;
            // 遍历数组
            for (int i = 0; i < tab.length ; i++)
                setEntryAt(tab, i, null);//直接用null节点覆盖
            ++modCount;
            count = 0;
        } finally {
            unlock();
        }
    }
    

2、集合的基本参数

// 整个集合中节点总数的默认值
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 数组的扩容因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 支持的并发数量,其实就是Segment数组的大小
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 节点总数最大值
static final int MAXIMUM_CAPACITY = 1 << 30;
// Segment对象最小的节点数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 最大的Segment对象数量,二进制16位
static final int MAX_SEGMENTS = 1 << 16;
/**
 * 默认自旋次数,超过这个次数直接加锁,防止在size方法中由于不停有线程在更新map,导致无限的进行自旋影响性能。
 * 只在size()/containsValue()方法使用
 */
static final int RETRIES_BEFORE_LOCK = 2;
// Segment对象掩码值,也就是Segment数组长度-1,用来与运算hash算法确定Segment数组下标
final int segmentMask;
/**
 * 用于定位参与hash运算的位数,this.segmentShift = 32 - sshift;(构造方法中)
 * 这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的
 * sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次
 * 所以sshift等于4
 */
final int segmentShift;
// Segment对象数组
final Segment<K,V>[] segments;
// 其他
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;

3、构造器方法

// 默认构造
public ConcurrentHashMap() {
    // 16, 0.75, 16
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
// 提供初始大小构造
public ConcurrentHashMap(int initialCapacity) {
    this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
// 两个参数构造
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

// 终极构造,initialCapacity为整个ConcurrentHashMap中所有数组总共的Entry节点个数
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    // 基本信息校验
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 下面找到>=concurrencyLevel最小2次幂方数
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // segmentShift是用于定位参与hash运算的位数
    this.segmentShift = 32 - sshift;
    /**
     * segmentMask是哈希运算的掩码,长度-1参与
     * 为啥这里构造方法中就确定这个全局的segment掩码值,而HashMap中会在put时使用数组长度-1动态获取
     * 因为一旦Segment对象长度指定之后,ConcurrentHashMap后续不会对Segment进行扩容,并发度不会改变
     */
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 初始节点总数量/segment对象数量=每个Segment对象下Entry数量
    int c = initialCapacity / ssize;
    /**
     * 为啥先除再乘ssize呢?这里是向上取整
     * 如initialCapacity=12时,ssize=8,c其实为1.5,但12/8=1,下述是向上取整操作
     * 目的:为了保证最后创建出来的Entry数必须大于等于你想要创建的initialCapacity数量
     */
    if (c * ssize < initialCapacity)
        ++c;//向上取整
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 确保cap为>=c的2的次方数
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    // ssize一定是2幂次方数,为了保证按位与的hash算法能均匀分布在[0,ssize]上
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 通过直接操作内存将s0对象放在Segment数组第0个位置
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

4、put方法

public V put(K key, V value) {
    Segment<K,V> s;
    // 不允许key为空
    if (value == null)
        throw new NullPointerException();
    // 计算key的hash值
    int hash = hash(key);
    // 计算Segment数组的下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 判断Segment的j位置是否为空,基于UNSAFE下的线程安全操作
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        /**
         * Segment的j位置为空时进行Segment对象的创建
         * 这里不在乎是哪个线程创建的,多线程下能创建成功即可
         */
        s = ensureSegment(j);
    // 调用Segment的put方法,与HashMap的put差不多,就保证了线程安全
    return s.put(key, hash, value, false);
}

构建Segment对象方法:ensureSegment

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    // 获取Segment数组的偏移量
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    // 再次判断Segment数组在u位置是否为空
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 下面直接使用Segment数组的Segment[0]参数进行Segment[k]的构建
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 重复检查是否为空
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                // 使用CSA操作(无锁操作),将创建出来的Segment对象s放入到ss数组的u位置上去
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) //失败返回false,继续while循环
                    break;//成功直接返回
            }
        }
    }
    return seg;
}

5、get方法

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    // 通过hash值算出Segment数组的下标
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 遍历Segment对象的下Entry数组
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // key的hash值相等且equals方法相等才返回
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

6、其他常用方法

size方法:

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                // 给所有Segment对象加锁
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            // 遍历Segment数组
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    // size为所有Segment对象的count和
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            // 给所有Segment对象释放锁
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

isEmpty方法:

public boolean isEmpty() {
    long sum = 0L;
    final Segment<K,V>[] segments = this.segments;
    // 遍历Segment数组
    for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
            if (seg.count != 0)
                return false; // 存在节点就返回false
            sum += seg.modCount;
        }
    }
    // 若Segment对象里节点的修改次数不为0(有修改),再重新检查一遍
    if (sum != 0L) { // recheck unless no modifications
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> seg = segmentAt(segments, j);
            if (seg != null) {
                if (seg.count != 0)
                    return false;
                sum -= seg.modCount;
            }
        }
        if (sum != 0L)
            return false;
    }
    return true;
}

b、JDK1.8.0.231源码

ConcurrentHashMap基本结构图(数组+单向链表+双向链表+红黑树)

1、集合结构及基本参数

  1. 基本类结构

    public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
        							 implements ConcurrentMap<K,V>, Serializable {
    
  2. 集合基本参数

    // 数组最大长度
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    // 数组初始大小
    private static final int DEFAULT_CAPACITY = 16;
    // 使用toArray方法最大数组的大小限制(超过会抛OOM异常)
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    // 链表转化为红黑树的阀值
    static final int TREEIFY_THRESHOLD = 8;
    // 由红黑树还原为链表的阀值(为啥不是8呢?防止在临界值8的时候进行节点的增删操作,导致频繁转化,影响效率)
    static final int UNTREEIFY_THRESHOLD = 6;
    /**
     * 最小树形化容量阈值
     * (当哈希表中的容量 > 该值时,才允许树形化链表,否则,若桶内元素太多时,则直接扩容,而不是树形化。
     * 为了避免进行 * 扩容、树形化选择的冲突,这个值不能小于 4 * TREEIFY_THRESHOLD)
     */
    static final int MIN_TREEIFY_CAPACITY = 64;
    // 默认最小步长
    private static final int MIN_TRANSFER_STRIDE = 16;
    // sizeCtl的stamp bit
    private static int RESIZE_STAMP_BITS = 16;
    // 节点的几种状态
    static final int MOVED     = -1;
    static final int TREEBIN   = -2;
    static final int RESERVED  = -3;
    static final int HASH_BITS = 0x7fffffff;
    // 虚拟处理器数量(并不是实际CPU核心数量)
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    // 数组
    transient volatile Node<K,V>[] table;
    // 扩容时下一个表
    private transient volatile Node<K,V>[] nextTable;
    // 基本计数变量
    private transient volatile long baseCount;
    // 数组初始化及扩容的阀值,初始值为0
    private transient volatile int sizeCtl;
    // 转移元素时下一个坐标
    private transient volatile int transferIndex;
    // 其他线程是否在操作counterCells数组,初始值0-未操作
    private transient volatile int cellsBusy;
    // 统计操作数组-用来统计集合size
    private transient volatile CounterCell[] counterCells;
    

2、节点的结构:Node与TreeNode

Node节点基本结构

  1. 基本参数

    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    
  2. 基本方法

    public final int hashCode() { 
        return key.hashCode() ^ val.hashCode();
    }
    // key、value都不为空且值与equals方法相等
    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }
    

3、构造器方法

// 默认构造
public ConcurrentHashMap() {
}
// 给定初始化大小构造器
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}
// 使用现成集合构造
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}
// 给定初始化数组大小与加载因子构造器
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}
// 最终调用构造
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor); // 计算扩容阀值
    int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap; // 扩容阀值
}

4、put方法

public V put(K key, V value) {
    return putVal(key, value, false);
}
// 实际调用
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 直接控制key和value不为空,1.7只控了value,但key为空时会在逻辑报错
    if (key == null || value == null) throw new NullPointerException();
    // 计算hash值
    int hash = spread(key.hashCode());
    int binCount = 0; // 统计链表节点个数
    // 循环数组
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable(); // 初始化数组
        // 计算key的数组下标:(n - 1) & hash, f为数组下标首节点
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 数组下标无节点
            // CAS操作将新节点放入该位置
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break; // CAS成功终止循环,失败继续循环
        }
        else if ((fh = f.hash) == MOVED)
            // 其他线程正在进行扩容操作,这里也需要辅助扩容保证线程安全
            tab = helpTransfer(tab, f);
        else { // 对链表进行遍历/操作
            V oldVal = null;
            synchronized (f) { // 对数组下标首节点进行加锁
                if (tabAt(tab, i) == f) { // 检查当前数组下标首节点是否为f
                    if (fh >= 0) {
                        // 用来记录链表长度,来决定是否转化为红黑树
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break; // 若key节点存在,直接新值覆盖老值并返回老值
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) { // 一直遍历到尾结点
                                // 尾插法
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break; // 插入完成,终止循环
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 为红黑树结构
                        Node<K,V> p;
                        binCount = 2;
                        // 进行红黑树节点插入
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            } // synchronized结束了,下面为啥不在锁里面进行呢?为了使treeifyBin方法更通用吧,在treeifyBin方法中通用也会对首节点加锁
            if (binCount != 0) {
                // 若链表长度达到链表转化红黑树阀值,进行转化
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i); // 链表转化为红黑树
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 统计Node节点个数与并发扩容操作
    addCount(1L, binCount);
    return null;
}

初始化数组方法:initTable

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            /**
             * yield是先放弃CPU资源,然后会再与其他线程竞争CPU资源
             * 极端情况下:A线程让出后马上能竞争到CPU资源,也会引起CPU100%
             */
            Thread.yield(); // lost initialization race; just spin
        // CAS操作让sc-1
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // 数组为空时才进行初始化
                if ((tab = table) == null || tab.length == 0) {
                    // 默认大小
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 创建新数组
                    table = tab = nt;
                    // n-1/4n=0.75n(下一扩容阈值)
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

多线程下辅助转移节点:helpTransfer

// 如果其他线程正在转移节点中,新线程通过该方法进行并发辅助转移
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 每个线程进行辅助转移时,会将SIZECTL加1
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 新老数组进行节点转移
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

转移节点方法:transfer

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 计算步长
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) { // 新数组为空,进行初始化
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // n << 1成倍扩容
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 节点正在转移时的标记节点
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) { // bound为步长
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            // --i >= bound条件包含两个信息:1、--i表明是从后往前循环 2、>=bound是转移[bound, i]步长范围内的节点
            if (--i >= bound || finishing)
                advance = false; // 转移完成了,停止转移
            // 将nextIndex设置为transferIndex的长度,transferIndex在下面会通过CAS操作更新的
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            /**
             * compareAndSwapInt(Object o, long offset, int expected, int x);
             * 读取传入对象o在内存中偏移量为offset位置的值与期望值expected作比较。
             * 相等就把x值赋值给offset位置的值。方法返回true。不相等,就取消赋值,方法返回false。
             * 下述操作会将TRANSFERINDEX偏移量的值transferIndex改为nextBound
             */
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
                      // 计算下一个步长的数组下标nextBound
                      nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                /**
                 * 随着bound与i的动态计算,可以确定每次移动的节点范围是按照步长来的
                 * 老数组具体移动范围:[bound, i]
                 */
                bound = nextBound; // 数组步长下标
                i = nextIndex - 1; // 结束下标
                advance = false; // 终止循环
            }
        }
        // n为老数组的长度,nextn新数组长度
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) { // 当前线程已完成转移了
                nextTable = null;
                table = nextTab;
                // 计算数组扩容阀值,(n << 1) - (n >>> 1) = 新数组长度 * 0.75
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // CAS给sizeCtl减1并且次数的sc已经赋值成sizeCtl了
            /**
             * CAS给sizeCtl减1并且次数的sc已经赋值成sizeCtl了
             * 此时sc = sizeCtl = SIZECTL = resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2;
             */
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 这里其实就是判断现在的sc是否等于传进来初始值
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return; // 证明其他线程还在进行节点转移(helpTransfer方法进入的)
                // 走到这里证明所有线程都已完成节点转移操作
                finishing = advance = true; // 置为节点转移完成状态
                i = n; // recheck before commit
            }
        }
        // 若当前数组下标节点为空,直接放置一个已转移的标记节点fwd
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 当前节点正在转移中,继续其他节点转移
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else { // 正在进行节点转移
            synchronized (f) { // 转移时对数组下标的首节点加锁
                if (tabAt(tab, i) == f) { // 检查之前获取的节点未被改变
                    Node<K,V> ln, hn;
                    if (fh >= 0) { // 链表节点转移
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) { // 红黑树节点转移
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

统计节点与扩容操作:addCount

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 竞争baseCount失败后通过counterCells数组,大大降低线程竞争消耗
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        // CAS操作给baseCount+1失败
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // 判断当前位置:ThreadLocalRandom.getProbe() & m
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            // cas给a+1
            !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // 给counterCells数组指定下标+1失败,在fullAddCount中继续操作
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 计算集合总节点个数
        s = sumCount();
    }
    // 扩容操作
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 集合的size>=扩容的阀值且数组不为空
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 数组转移方法
                    transfer(tab, nt);
            }
            // cas修改sizeCtl值
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // SIZECTL = resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2; 里面transfer有用到该值
                transfer(tab, null);
            s = sumCount();
        }
    }
}

5、get方法

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    // 数组不为空且该下标存在节点
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            // 值相等且equals方法相等
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

6、其他常用方法

size方法:

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
// 使用CounterCell统计节点个数
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        // 遍历counterCells数组并求和
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

isEmpty方法:

public boolean isEmpty() {
    return sumCount() <= 0L; // ignore transient negative values
}
举报

相关推荐

0 条评论