0
点赞
收藏
分享

微信扫一扫

理解 ConcurrentHashMap

小美人鱼失去的腿 2022-05-05 阅读 75

ConcurrentHashMap的实现原理

ConcurrentHashMap 在 JDK1.7 和 JDK1.8 的实现方式是不同的

JDK1.7下的ConcurrentHashMap

实现原理是数组加链表,jdk1.7下,ConcurrentHashMap由segment 数组结构和 hashEntry 数组结构构成的,每个hashEntry相当于HashMap 中的table数组,即 ConcurrentHashMap 把哈希桶数组切分成小数组(Segment ),每个小数组有 n 个 HashEntry 组成。

每个segment配备一把锁,当一个线程访问其中一段数据时,就可以给这段segment 进行上锁,这样在保证segment锁住的同时而不影响其他线程访问其他的segment,实现了真正的并发访问。

Segment 是 ConcurrentHashMap 的一个内部类,主要的组成如下:Segment继承了ReentrantLock,因此Segment 是一种可重入锁,Segment 的默认值为16,即并发度为16。

 存放元素的HashEntry也是一个静态内部类,其中用volatile 修饰了  HashEntry 的数据 value 和 下一个节点 next,保证了多线程环境下数据获取时的可见性

 

JDK1.8下的ConcurrentHashMap

 JDK1.8 中的ConcurrentHashMap 选择了与 HashMap 相同的Node数组+链表+红黑树结构;在锁的实现上,抛弃了原有的 Segment 分段锁,采用CAS + synchronized实现更加细粒度的锁。

1.8下,ConcurrentHashMap将锁的级别控制在了更细粒度的哈希桶级别,只要锁住这个桶位置的头节点,就不会影响其他桶数组元素的读写,大大提升了并发度。

 

 JDK1.8 中为什么使用内置锁 synchronized替换 可重入锁 ReentrantLock?

1、在jdk1.6中,对synchronized锁的实现进行了大量的优化,支持多种锁状态,会从无锁->偏向锁->轻量级锁->重量级锁一步步的升级转换。

2、减少内存的开销,假设使用可重入锁来获得同步支持,那么每个节点都需要继承AQS来获得同步支持,但是实际上,只有链表的头节点或者红黑树的根节点才需要同步,带来了巨大的内存浪费。

ConcurrentHashMap的源码解析

JDK1.7下的ConcurrentHashMap的源码

1、初始化:
initialCapacity :初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。
比如你初始容量是 64,Segment 的容量为16,那么每个段中哈希表的初始容量就为 64/16=4。
loadFactor:这个负载因子是给 段中哈希表 扩容时候使用的

Segment数组的长度是不可以被改变的,初始化如果不规定,那么就采用默认的 16

从源码中发现,创建ConcurrentHashMap的时候,在Segment第一个位置初始化了一个HashEntry,其他位置则没有,这是因为每次创建一个Segment对象的时候,要进行大量的初始化工作,首先初始化一个s0后,后面只要在涉及到创建Segment对象,只要 拿s0里面的数据当做模板就可以,这样会增加一定的效率。

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    // 非法参数判断。
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 并行级别越界重设。
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    
    // Find power-of-two sizes best matching arguments 找到2次幂大小的最佳匹配参数.
    int sshift = 0; // 偏移量吧因该是。
    int ssize = 1; // segment的数组长度。
    
    // 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方
    // 如果这里我们的并行级别是16的话。
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 到这一步,sshift = 4, ssize = 16。

    // 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值
    this.segmentShift = 32 - sshift; // 段偏移量
    this.segmentMask = ssize - 1; // 段掩码

    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    // initialCapacity 是设置整个 map 初始的大小,
    // 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小。
    // 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个。
    // 那么此时段中哈希表的容量就为 4。
    int c = initialCapacity / ssize; // c 用于
    
    // 如果段中容量与segment的数组长度乘积小于了整个 ConcurrentHashMap 的初始容量,
    // 那我们就把目标容量(段中哈希表容量) + 1;
    if (c * ssize < initialCapacity)
        ++c;
    
    // 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,
    // 插入一个元素不至于扩容,插入第二个的时候才会扩容
    int cap = MIN_SEGMENT_TABLE_CAPACITY; //先给此时的段中容量默认为 2 。
    // 如果段中默认容量小于目标容量的话就一直扩大到段中默认容量等于目标容量。
    while (cap < c)
        cap <<= 1;

    // 创建段数组的第一个元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    // 创建段数组,数组长度就为 ssize 。
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往段数组中有序的写入 segment[0]。
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

2.put操作,这个方法主要代码就是定位到相应的Segment,然后针对这个Segment进行put操作;

public V put(K key, V value) {
    // 先建一个段的临时桶。
    Segment<K,V> s;
    // 如果要put的值为空的话就抛出异常。
    if (value == null)
        throw new NullPointerException();
    
    // 计算 key 的 hash 值
    int hash = hash(key);
    
    // 根据 hash 值找到段表中的位置 j。
    // hash 是 32 位,无符号右移 segmentShift(28) 位,剩下高 4 位,
    // 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的高 4 位,也就是槽的数组下标
    /*
    	0000 0000 0000 0000 0000 0000 0000 1101
    	0000 0000 0000 0000 0000 0000 0000 1111
    										&
    	----------------------------------------
    	0000 0000 0000 0000 0000 0000 0000 1101
    */
    
    int j = (hash >>> segmentShift) & segmentMask;
    // 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null。
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j); // 初始化 j 处的桶。
    // 插入新值到 s 中。
    return s.put(key, hash, value, false);
}

首先看一下ensureSegment()这个方法;

在put方法中,首先尝试获取j位置的桶对象,如果这个位置桶对象不为null,那么就直接执行后续的put操作,如果为空,则进入ensureSegment方法

看到这个方法里面,还是不断的通过UNSAFE.getObjectVolatile(ss, u)这个方法去判断对应坐标位置的Segment对象被其他线程创建了,不断的去判断是因为创建Segment对象需要初始化大量的属性,如果在cas创建Segment之前仅做一次判断,那么会浪费内存空间。

确保Segment对象为null时,就通过CAS来创建Segment对象,这个操作是原子性的

最终方法返回seg对象,不管这个seg对象是哪个线程创建的,只要被创建了,就把他返回。

private Segment<K,V> ensureSegment(int k) {
    // 临时段表。
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    // 注意,这里是getObjectVolatile这个方法,这个方法的意思就是,别的线程要是修改了segment[k],这个线程是可见的。
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 这里看到为什么之前要初始化 segment[0] 了,
        // segment[0] 就相当于一个初始化模板,
        // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k],
        // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了。
        Segment<K,V> proto = ss[0]; // 获取当前 segment[0] 作为初始化模板。
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);

        // 初始化 segment[k] 内部的哈希表。
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 再次检查一遍该槽是否被其他线程初始化了。
        // 也就是在做上面那些操作时,看看是否有别的线程操作过 segment[k]。
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { 
			
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 构建新的 segment 对象。
       
            // 再次检查 segment [k] 是否为null。
            // 注意,这里是while,之前的if,也是起到如果下面操作失败,再次检查的作用。
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    // CAS操作,这里的比较并交换在CPU里面就是一条指令,保证原子性的。
                    // 不存在那种比较完毕之后的间隙,突然切换到别的线程来修改这个值的情况。
                    break;
            }
        }
    }
    
    // 返回 segment 对象。
    // 这里返回的seg可能是自己new的,也可能是别的线程new的,反正只要其中一个就好了。
    return seg;
}

得到Segment对象后,就可以对这个Segment对象内部的HashEntry执行put操作了

final V put(K key, int hash, V value, boolean onlyIfAbsent) {

    // 再 put 到指定段中之前,我们得获取到当前段表中这个桶的独占的锁。
    // 以此来保证整个过程,只有我们一个线程在对这个桶做操作。
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
   
    V oldValue; // 用来存储被覆盖的值。
    
    try {
        // 这个是段表某个桶内部的哈希表。
        HashEntry<K,V>[] tab = table;
        
        // 再次利用待插入键值对 key 的 hash 值,求应该放置的数组下标。
        int index = (tab.length - 1) & hash;
        // first 是桶中哈希表的待插入桶处的链表的表头。
        HashEntry<K,V> first = entryAt(tab, index);

        // 遍历链表。
        for (HashEntry<K,V> e = first;;) {
            // 这个遍历主要是为了找出,key重复的情况。
            if (e != null) {
                K k;
                // 如果当前链表节点的key重复了的话。
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k)))
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆盖旧的值。
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
            
                e = e.next;
            }
            else {
                // node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
                // 如果不为 null。
                if (node != null)
                    // 那就把它设置为链表表头,JDK1.7使用头插法。
                    node.setNext(first);
                else
                    // 否则如果是null,就先初始化它再设置为链表表头。
                    node = new HashEntry<K,V>(hash, key, value, first);

                int c = count + 1;
                // 如果超过了段表中该桶中哈希表的阈值,这个哈希表就需要扩容。
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 扩容。
                else
                    // 没有达到阈值,将 node 放到哈希表的 index 位置,
                    // 其实就是将新的节点设置成原链表的表头,使用头插法。
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解锁
        unlock();
    }
    return oldValue;
}

scanAndLockForPut()获取写入锁

首先尝试获取锁,如果未获取到,则执行scanAndLockForPut方法,这个方法就是不断的循环获取锁,如果获取失败了,就进行一些准备工作

(retries & 1) == 0这个判断代表重试次数为偶数的时候进行后面的判断,为什么会有这个设计呢?我想可能是因为后面的代码执行也需要时间,有可能在执行后续代码的时候,此线程失去了获取锁的机会,造成锁饥饿吧。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    // 简单来说就是拿到段表中某个桶中的哈希表数组中通过hash计算的那个下标下的第一个节点。
    HashEntry<K,V> first = entryForHash(this, hash);
    // 备份一下当前节点的内容。
    HashEntry<K,V> e = first;
    // 辅助变量。
    HashEntry<K,V> node = null;
    int retries = -1; // 用于记录获取锁的次数。

    // 循环获取锁。
    // 如果获取失败,就会去进行一些准备工作。
    while (!tryLock()) {
        // 辅助变量用于重复检查,
        // 用来检查对应段表中那个桶上的哈希表数组中对应索引桶处,之前取出来的第一个节点是否还是我们之前取得那个。
        HashEntry<K,V> f; // to recheck first below
        
        // 准备工作。
        // 因为准备工作也不需要每次循环都去做对吧,最好的预期,做一次准备工作就够了。
        if (retries < 0) {
            // 判断段表中对应桶中的哈希表的对应桶上的节点 HashEntry 是不是还没被初始化。
            if (e == null) {
               
                if (node == null) // speculatively create node
                    // 进到这里说明数组该位置的链表是空的,没有任何元素。
                    // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置。
                    // 将我们即将插进去的元素,构建成一个HashEntry节点对象。
                    node = new HashEntry<K,V>(hash, key, value, null);
                
                // 将 retries 赋值为0,不让准备工作重复执行。
                retries = 0;
            }
            // 否则的话,判断 key 是否有重复的。
            else if (key.equals(e.key))
                // 将 retries 赋值为0,不让准备工作重复执行。
                retries = 0;
            else
                // 否则顺着链表往下走。
                e = e.next;
        }
        
        // 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁,避免cpu空转。
        // lock() 是阻塞方法,直到获取锁后返回。
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 && // 偶数次数才进行后面的判断。
                 // 这个时候出现问题了,那就是有新的元素进到了链表,成为了新的表头。
                 // 也可以说是链表的表头被其他线程改变了。
                 
                 // 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法。
                 (f = entryForHash(this, hash)) != first) {
            
            // 此时怎么做呢,
            // 别的线程修改了该segment的节点,重新赋值e和first为最初值,和第一二行代码一样的效果。
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    
    // 将准备工作制作好的节点返回。
    return node;
}

面试被问到 ConcurrentHashMap答不出 ,看这一篇就够了!_Java烂猪皮V的博客-CSDN博客

理解Java7和8里面HashMap+ConcurrentHashMap的扩容策略

举报

相关推荐

0 条评论