0
点赞
收藏
分享

微信扫一扫

既生AtomicXXX,何生LongAdder?

既生AtomicXXX,何生LongAdder?

LongAdder是JDK1.8在​​java.util.concurrent.atomic​​包下新引入的 为了高并发下实现高性能统计的类。

不是有​​AtomicInteger​​ 和 ​​AtomicLong​​嘛,咋还蹦出来个​​LongAdder​​???

首先,我们要知道的是,无论是​​AtomicInteger​​还是​​AtomicLong​​,本质上都是利用CAS的思想,来保证的变量的原子性,虽然说比较好用,但是在高并发环境下,多个线程争夺同一个资源时,如果自旋一直不成功,会一直循环,一直占用CPU,这会极大的浪费CPU资源。

public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

所以,​​LongAdder​​应运而生,其类似于分治的思想,将这种并发竞争给分散开来,分而治之,从而达到减少竞争,降低冲突的目的。

LongAdder原理解析

LongAdder内部维护一个 volatile变量​​base​​和一个​​Cell​​类型的数组,每个​​Cell​​类内部也会维护一个volatile字段。

没有竞争的前提下,跟​​AtomicXXX​​一样,通过CAS base​这个变量来完成数据的更新

一旦发生冲突,即CAS失败后,将会启用Cell​数组,通过将当前线程的hash值与Cell​数组长度取模,获取到其对应的Cell类,再对此Cell类的内部变量进行CAS,从而达到数据的更新和减少竞争的目的。

图示如下:

既生AtomicXXX,何生LongAdder?_数组

add

源码解读

public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

final boolean casBase(long cmp, long val) {
// CAS更新base值
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

通过对上面源码的解读,我们可以了解到

  1. 在没有冲突的情况下,也就是casBase​一直成功的时候,会直接返回
  2. 一旦CAS失败返回false,会去使用Cell​数组
  3. if​语句中我们可以看到一个小的优化点,就是如果在​Cell​数组合法及计算出当前线程对应得​​​Cell​​​不为null的情况下,会再次去尝试一次CAS,如果能成功的话就不去调用​​​longAccumulate​​​方法了

longAccumulate

进入时机:

  1. 第一次发生竞争时,​​Cell​​数组为null的情况下
  2. 之后发生竞争时,当前线程CAS对应​​Cell​​类失败的情况下

源码解读

/**
* 扩容和初始化Cell数组时,通过CAS实现的一个自旋锁
*/
transient volatile int cellsBusy;

/**
* Cell数组最大长度,为核数
*/
static final int NCPU = Runtime.getRuntime().availableProcessors();

final boolean casCellsBusy() {
// 获取锁,将cellsBusy置为1
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 对当前线程的hash值做一次校验,如果hash值没有被初始化,则强制初始化一下
// 线程hash默认是0,也就是说当前线程是第一次进入该方法
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
// 当前线程第一次进入该方法,表示当前线程还未参与竞争,参与竞争的线程hash都不为0
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// Cell数组已经初始化过
if ((as = cells) != null && (n = as.length) > 0) {
// 当前线程对应的Cell还未初始化
if ((a = as[(n - 1) & h]) == null) {
// 获取锁
if (cellsBusy == 0) {
// 创建一个新的Cell
Cell r = new Cell(x);
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
// 再次校验当前线程对应的Cell是否为null
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 释放锁,重新置为0
cellsBusy = 0;
}
// 创建成功,跳出循环
if (created)
break;
continue; // Slot is now non-empty
}
}
// 获取锁失败,说明发生冲突,将collide置为false
collide = false;
}
else if (!wasUncontended)
// 调用add方法时,在if里面的cas操作失败了
// 已经知道cas会失败,下面会为当前线程重新计算hash值
wasUncontended = true;
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// 当前线程对应的Cell不为null,CAS成功,直接返回
break;
else if (n >= NCPU || cells != as)
// 如果Cell数组长度已经到达了最大值,或者当前Cell数组已经扩容过了,则需要重试
collide = false;
else if (!collide)
// 需要重试
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 检查是否已经扩容
if (cells == as) {
// Cell数组扩容,每次扩容为原来的2倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//为当前线程重新计算hash值
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 第一次竞争,Cell数组还未初始化,需要进行初始化
boolean init = false;
try { // Initialize table
if (cells == as) {
// 默认容量为2
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
// 初始化成功,跳出循环
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// 兜底策略,再次尝试使用cas base
break;
}
}

sum

通过以上的源码了解,相信各位小伙伴已经知道了

LongAdder获取当前value值的方式,就是通过遍历Cell​数组,累加其中的​value​值,最后再加上​​​base​​​值,就是当前的​​​value​​​了。源码如下:

public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

但是,这样获取的​​value​​值可能是不准确的,因为再遍历​​Cell​​数组的过程中,可能有别的线程将已经遍历过的​​Cell​​又进行数据更新,这样一来,当前线程获取的​​value​​值就是不准确的了。

LongAdder流程分析

案例分析

base基础值是:10,Cell数组已经初始化过

此时有三个线程并发来进行自增操作

线程1 CAS成功,也代表着线程2,线程3 CAS失败,此时,base = 11

线程2,线程3,通过hash值计算得到不同的Cell数组下标,分别对当前Cell类内部value值CAS自增

最终统计sum时,遍历Cell数组,累加得到2,再加上base=11,则sum = 13

既生AtomicXXX,何生LongAdder?_数组_02

扩展性知识

LongAccumulator

​LongAdder​​,只能对​​value​​进行加减操作,可能在一些特殊场景下下无法满足我们的需求,这时候就有了​​LongAccumulator​

​LongAccumulator​​原理本质上跟​​LongAdder​​一致,只是提供了自定义修改value的方式。

public class LongAdderTest {

public static void main(String[] args) {
LongAccumulator accumulator = new LongAccumulator((x, y) -> x * y, 10);
accumulator.accumulate(2);

System.out.println(accumulator.longValue());
}

}

原理也很简单,传入一个自定义计算的表达式,在CAS更新的时候,将计算好的值作为要更新的值就ok啦

既生AtomicXXX,何生LongAdder?_数组_03

当然啦,因为​​LongAccumulator​​计算当前值跟​​LongAdder​​原理一致,所以不太精准。

但是没关系,​​AtomicInteger​​也有类似的方法

public class LongAdderTest {

public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);
atomicInteger.getAndUpdate(x -> x * 3);

System.out.println(atomicInteger.get());
}

}

Cell类使用Contended注解来消除伪共享

有关伪共享的知识,大家可以去了解一下。

@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

@sun.misc.Contended。加上这个注解的类会自动补齐缓存行,需要注意的是此注解默认是无效的,需要在jvm启动时设置-XX:-RestrictContended才会生效。

小彩蛋

在写这篇文章的时候,知道​​LongAdder​​为减少竞争使用Cell数组,内部通过hash值取模定位数组,有想到这做法不是跟Map一样嘛,哈哈

后来又了解了,​​LongAdder​​统计当前元素值可能不太精准的时候,心想,​​ConcurrentHashMap​​是咋统计的,于是我跑去瞅了瞅,这不看不知道,一看吓一跳,哈哈

不能说是有点类似,只能说是一摸一样

既生AtomicXXX,何生LongAdder?_数组_04

既生AtomicXXX,何生LongAdder?_ide_05

​ConcurrentHashMap​​每次​​put​​ 完元素后,会更新​​size​​,就会去调用​​addCount​​方法

这里面其实跟​​LongAdder​​一样,通过数组 + base值来处理,最终统计也是遍历数组累加 + base值得到的结果,所以​​ConcurrentHashMap​​的​​size​​也不准确

惭愧啊~,要不是今天看了,我都忘记了,明明记得之前看过​​ConcurrentHashMap​​源码的

举报

相关推荐

研究生总结

0 条评论