Java AtomicLong 和 LongAdder

AtomicLong

AtomicLong是JUC包提供的原子性操作类,其内部通过CAS保证了对计数的原子性更新操作。

java.util.concurrent.atomi

1
2
3
4
5
6
7
8
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}

内部是通过UnSafe(rt.jar)这个类的CAS操作来保证对内部的计数器变量long value进行原子性更新的。其中unsafe.getAndAddLong的代码如下:

1
2
3
4
5
6
7
8
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2); //(1)
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); //(2)

return var6;
}

可知最终调用的是native方法compareAndSwapLong原子性操作。

假如多个线程调用同一个AtomicLong对象进行更新操作(getAndIncrement),会发生什么事情?

多个线程都会执行到unsafe.getAndAddLong方法,然后多个线程都会执行到代码(1)处获取计数器的值。接着多个线程再去(2)尝试更新值,由于CAS具有原子性,所以只有一个线程会更新成功,然后返回true从而退出循环,整个更新操作就完成了。其他线程则CAS失败后返回false,则循环一次在次从(1)处获取当前计数器的值,然后在尝试执行(2),这叫做CAS的自旋操作,本质是使用CPU资源换取使用锁带来的上下文切换等开销。


LongAdder

AtomicLong类为开发人员使用线程安全的计数器提供了方便,但是AtomicLong在高并发下存在一些问题。比如,当大量线程调用同一个AtomicLong的实例的方法时候,同时只有一个线程会CAS计数器的值成功,失败的线程则会原地占用cpu进行自旋转重试,这回造成大量线程白白浪费CPU原地自旋转。

在JDK8中新增了一个LongAdder类,其采用分而治之的策略来减少同一个变量的并发竞争度,LongAdder的核心思想是把一个原子变量分解为多个变量,让同样多的线程去竞争多个资源,这样竞争每个资源的线程数就被分担了下来。

在LongAdder的底层实现中,首先有一个base值,刚开始多线程来不停的累加数值,都是对base进行累加的,比如刚开始累加成了base = 5。接着如果发现并发更新的线程数量过多,就会开始施行分段CAS的机制,也就是内部会搞一个Cell数组,每个数组是一个数值分段。

这时,让大量的线程分别去对不同Cell内部的value值进行CAS累加操作,这样就把CAS计算压力分散到了不同的Cell分段数值中了。这样就可以大幅度的降低多线程并发更新同一个数值时出现的无限循环的问题,大幅度提升了多线程并发更新数值的性能和效率。

而且LongAdder内部实现了自动分段迁移的机制,也就是如果某个Cell的value执行CAS失败了,那么就会自动去找另外一个Cell分段内的value值进行CAS操作。这样也解决了线程空旋转、自旋不停等待执行CAS操作的问题,让一个线程过来执行CAS时可以尽快的完成这个操作。

最后,如果你要从LongAdder中获取当前累加的总值,就会把base值和所有Cell分段数值加起来返回给你。

Analyze Source Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/** Number of CPUS, to place bound on table size */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;

/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;

/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;

@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

LongAdder维护了一个延迟初始化的原子性更新数组(默认情况下Cell数组是null)和一个基值变量base,由于Cells占用内存是相对比较大的,所以一开始并不创建,而是在需要时候在创建,也就是惰性创建。

当一开始判断cell数组是null并且并发线程较少时候所有的累加操作都是对base变量进行的,这时候就退化为了AtomicLong。cell数组的大小保持是2的N次方大小,初始化时候Cell数组的中Cell的元素个数为2,数组里面的变量实体是Cell类型。

当多个线程在争夺同一个Cell原子变量时候如果失败并不是在当前cell变量上一直自旋CAS重试,而是会尝试在其它Cell的变量上进行CAS尝试,这个改变增加了当前线程重试时候CAS成功的可能性。

LongAdder返回的值是把所有Cell变量的value值累加后再加上base值,如下代码:

1
2
3
4
5
6
7
8
9
10
11
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

QA

  • 何时初始化cell数组?
  • 当前线程如何选择cell中的元素进行访问?
  • 如果保证cell中元素更新的线程安全?
  • cell数组何时进行扩容,cell元素个数可以无限扩张?

Refference

http://ifeve.com/juc1/
https://juejin.im/post/5c062c87e51d451dbc21801b
http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/