CAS
原子类
何为原子类
即为java.util.concurrent.atomic包下的所有相关类和API
没有CAS之前
- 多线程环境不使用原子类保证线程安全i++(基本数据类型)
常用synchronized
锁,但是它比较重 ,牵扯到了用户态和内核态的切换,效率不高。
1 | public class T3 |
使用CAS之后
- 多线程情况下使用原子类保证线程安全(基本数据类型)
1 | public class T3 |
- 类似于乐观锁
CAS是什么
CAS基本知识
compare and swap的缩写,中文翻译成比较并交换,实现并发算法时常用到的一种技术。它包含三个操作数——内存位置、预期原值及更新值。
执行CAS操作的时候,将内存位置的值与预期原值比较:
- 如果相匹配,那么处理器会自动将该位置值更新为新值,
- 如果不匹配,处理器不做任何操作,多个线程同时执行CAS操作只有一个会成功。
CAS原理
CAS (CompareAndSwap)
CAS有3个操作数,位置内存值V,旧的预期值A,要修改的更新值B。
当且仅当旧的预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做或重来*
当它重来重试的这种行为成为—自旋
- eg
- 线程A读取了值为5,想要更新为6,想要将值写回的时候发现线程B和C都进行了操作,已经变成了7,这个时候A不能成功,可能会发生自旋
CASDemo代码
多线程情况下使用原子类保证线程安全(基本数据类型)
1 | public class CASDemo { |
硬件级别保证
对总线加锁,效率比synchronized效率高。
CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。
它是非阻塞的且自身原子性,也就是说这玩意效率更高且通过硬件保证,说明这玩意更可靠。
CAS是一条CPU的原子指令 (
cmpxchg指令
),不会造成所谓的数据不一致问题,Unsafe
提供的CAS方法
(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg。执行cmpxchg指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功
加锁成功之后会执行cas操作,也就是说CAS的原子性实际上是CPU实现独占的 , 其实在这一点上还是有排他锁的
只是比起用synchronized, 这里的排他时间要短的多, 所以在多线程情况下性能会比较好
源码分析
jdk8
1 | //compareAndSet |
1 | public final native boolean compareAndSwapObject(Object o, long offset, |
上面三个方法都是类似的,主要对4个参数做一下说明。
- o:表示要操作的对象
- offset:表示要操作对象中属性地址的偏移量
- expected:表示需要修改数据的期望的值
- x:表示需要修改为的新值
jdk17
1 | private static final Unsafe U = Unsafe.getUnsafe(); |
引出来一个问题:Unsafe类是什么?
CAS底层原理?如果知道,谈谈你对UnSafe的理解
UnSafe
1 | public class AtomicInteger extends Number implements java.io.Serializable { |
Unsafe
CAS这个理念 ,落地就是Unsafe类
它是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门 ,基于该类可以直接操作特定内存\ 的数据 。Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。
注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务 。
打开rt.jar包(最基本的包)
变量
valueOffset
,表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。1
2
3public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}变量value用volatile修饰,保证了多线程之间的内存可见性。
我们知道i++线程不安全的,那atomicInteger.getAndIncrement()
CAS的全称为Compare-And-Swap,它是一条CPU并发原语。
它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。
AtomicInteger 类主要利用 CAS (compare and swap)
+ volatile
和 native
方法来保证原子操作,从而避免 synchronized
的高开销,执行效率大为提升。
- jdk8
jdk17
CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令 。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语 ,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。
源码分析
若在OpenJDK源码中查看Unsafe.java
这里while体现了自旋的思想
假如是ture,取反false退出循环;假如是false,取反true要继续循环。
原理
- 假设线程A和线程B两个线程同时执行getAndAddInt操作(分别跑在不同CPU上):
- AtomicInteger里面的value原始值为3,即主内存中AtomicInteger的value为3,根据JMM模型,线程A和线程B各自持有一份值为3的value的副本分别到各自的工作内存。
- 线程A通过getIntVolatile(var1, var2)拿到value值3,这时线程A被挂起 。
- 线程B也通过getIntVolatile(var1, var2)方法获取到value值3,此时刚好线程B没有被挂起并执行compareAndSwapInt方法比较内存值也为3,成功修改内存值为4,线程B打完收工,一切OK。
- 这时线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的值数字3和主内存的值数字4不一致,说明该值已经被其它线程抢先一步修改过了,那A线程本次修改失败,只能重新读取重新来一遍了。
- 线程A重新获取value值,因为变量value被volatile修饰,所以其它线程对它的修改,线程A总是能够看到,线程A继续执行compareAndSwapInt进行比较替换,直到成功。
底层汇编
了解即可
Unsafe
类中的compareAndSwapInt
,是一个本地方法,该方法的实现位于unsafe.cpp
中*核心
(Atomic::cmpxchg(x, addr, e)) == e;
1
2
3
4
5
6
7
8
9
10
11
12
13UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv* env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 先想办法拿到变量value在内存中的地址,根据偏移量valueOffset,计算 value 的地址
jint* addr = (jint* ) index_oop_from_field_offset_long(p, offset);
// 调用 Atomic 中的函数 cmpxchg来进行比较交换,其中参数x是要交换的值,e是要比较的值
//cas成功,返回期望值e,等于e,此方法返回true;
//cas失败,返回内存中的value值,不等于e,此方法返回false
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
//-------------核心(Atomic::cmpxchg(x, addr, e)) == e;
//JDK提供的CAS机制,在汇编层级会禁止变量两侧的指令优化,然后使用cmpxchg指令比较并更新变量值(原子性)
再看看
cmpxchg
里面是什么1
2// 调用 Atomic 中的函数 cmpxchg来进行比较交换,其中参数x是即将更新的值,参数e是原内存的值
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;1
2
3
4
5
6unsigned Atomic::cmpxchg(unsigned int exchange_value,volatile unsigned int* dest, unsigned int compare_value) {
assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
//根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载函数
return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest, (jint)compare_value);
}不同的操作系统下会调用不同的compxchg重载函数,例如win10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
//判断是否是多核CPU
int mp = os::is_MP();
__asm {
//三个move指令表示的是将后面的值移动到前面的寄存器上
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
//CPU原语级别,CPU触发
LOCK_IF_MP(mp)
//比较并交换指令
//cmpxchg: 即“比较并交换”指令
//dword: 全称是 double word 表示两个字,一共四个字节
//ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元
//将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值进行对比,
//如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中
cmpxchg dword ptr [edx], ecx
}
}总结
你只需要记住:CAS是靠硬件实现的从而在硬件层面提升效率,最底层还是交给硬件来保证原子性和可见性
实现方式是基于硬件平台的汇编指令,在intel的CPU中(X86机器上),使用的是汇编指令cmpxchg指令。
核心思想就是:比较要更新变量的值V(内存offset上的值)和预期值E(volatile取得的值)(compare),相等才会将V的值设为新值N(swap/set)。如果不相等自旋再来。
自定义原子引用
譬如AtomicInteger原子整型,可有其他原子类型,比如AtomicBook、AtomicOrder
丢入泛型中
Class AtomicReference<V>
eg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class AtomicReferenceDemo {
public static void main(String[] args) {
User z3 = new User("z3",24);
User li4 = new User("li4",26);
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(z3);
System.out.println(atomicReference.compareAndSet(z3, li4)+"\t"+atomicReference.get().toString());
System.out.println(atomicReference.compareAndSet(z3, li4)+"\t"+atomicReference.get().toString());
}
}
class User{
String userName;
int age;
}
//true User(userName=li4, age=26)
//false User(userName=li4, age=26
CAS与自旋锁,借鉴CAS思想
CAS落地的重要应用-自旋锁
是什么
自旋锁(spinlock)
是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁 ,
当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。
若在OpenJDK源码中查看Unsafe.java
这里while体现了自旋的思想
假如是ture,取反false退出循环;假如是false,取反true要继续循环,尝试CAS操作,直到返回true。
自己实现一个自旋锁SpinLockDemo
题目:实现一个自旋锁
自旋锁好处:循环比较获取没有类似wait的阻塞。通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒钟,B随后进来后发现
当前有线程持有锁,不是null,所以只能通过自旋等待,直到A释放锁后B随后抢到。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43/**
* @Author: yumo
* @Description: 实现一个自旋锁
* 自旋锁好处:循环比较获取没有类似wait的阻塞。
* 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁2秒钟,B随后进来后发现
* 当前有线程持有锁,不是null,所以只能通过自旋等待,直到A释放锁后B随后抢到。
* @DateTime: 2023/2/17 15:25
**/
public class SpinLockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void lock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"\tcome in,lock");
while (!atomicReference.compareAndSet(null, thread)) {}
}
public void unlock(){
Thread thread = Thread.currentThread();
while (!atomicReference.compareAndSet(thread, null)) {}
System.out.println(Thread.currentThread().getName()+"\tend,unlock");
}
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(() -> {
spinLockDemo.lock();
try{
TimeUnit.SECONDS.sleep(2);
}catch(InterruptedException e){
e.printStackTrace();
}
spinLockDemo.unlock();
},"A").start();
try{ TimeUnit.MILLISECONDS.sleep(50); }catch(InterruptedException e){ e.printStackTrace(); }
new Thread(() -> {
spinLockDemo.lock();
spinLockDemo.unlock();
},"B").start();
}
}
CAS缺点
1 循环时间长开销很大
do while
如果它一直自旋会一直占用CPU时间,造成较大的开销
如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销。
2 引出来ABA问题
什么是ABA问题
CAS会导致“ABA问题”。
CAS算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化。
比如说一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且线程two进行了一些操作将值变成了B,
然后线程two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后线程one操作成功。
尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的。
如何解决
AtomicStampedReference版本号 (注意区分前面的**Class AtomicReference
**) Class AtomicStampedReference
相关API 1
2AtomicStampedReference(V initialRef, int initialStamp)
创建一个新的 AtomicStampedReference与给定的初始值。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public boolean weakCompareAndSet(V expectedReference,//旧值
V newReference,//新值
int expectedStamp,//旧版本号
int newStamp)//新版本号
以原子方式设置该引用和邮票给定的更新值的值,如果当前的参考是==至预期的参考,并且当前标志等于预期标志。
May fail spuriously and does not provide ordering guarantees ,所以只是很少适合替代compareAndSet 。
参数
expectedReference - 参考的预期值
newReference - 参考的新值
expectedStamp - 邮票的预期值
newStamp - 邮票的新值
结果
true如果成功
```*eg
- 基本情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Book{
private int id;
private String bookName;
}
public class AtomicStampedDemo {
public static void main(String[] args) {
Book javaBook = new Book(1, "javaBook");
AtomicStampedReference<Book> stampedReference = new AtomicStampedReference<>(javaBook,1);
System.out.println(stampedReference.getReference()+"\t"+stampedReference.getReference());
Book mysqlBook = new Book(2, "mysqlBook");
boolean b;
b= stampedReference.compareAndSet(javaBook, mysqlBook, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(b+"\t"+stampedReference.getReference()+"\t"+stampedReference.getStamp());
}
}
//Book(id=1, bookName=javaBook) Book(id=1, bookName=javaBook)
//true Book(id=2, bookName=mysqlBook) 2- ABA复现(单线程情况下)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class AtomicStampedDemo {
public static void main(String[] args) {
Book javaBook = new Book(1, "javaBook");
AtomicStampedReference<Book> stampedReference = new AtomicStampedReference<>(javaBook,1);
System.out.println(stampedReference.getReference()+"\t"+stampedReference.getReference());
Book mysqlBook = new Book(2, "mysqlBook");
boolean b;
b= stampedReference.compareAndSet(javaBook, mysqlBook, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(b+"\t"+stampedReference.getReference()+"\t"+stampedReference.getStamp());
b= stampedReference.compareAndSet(mysqlBook,javaBook, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(b+"\t"+stampedReference.getReference()+"\t"+stampedReference.getStamp());
}
}
//Book(id=1, bookName=javaBook) Book(id=1, bookName=javaBook) --------
//true Book(id=2, bookName=mysqlBook) 2
//true Book(id=1, bookName=javaBook) 3 --------虽然1.3行内容是一样的,但是版本号不一样ABA复现(多线程情况下)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public class ABADemo
{
static AtomicInteger atomicInteger = new AtomicInteger(100);
static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1);
public static void main(String[] args)
{
new Thread(() -> {
atomicInteger.compareAndSet(100,101);
atomicInteger.compareAndSet(101,100);//这里 中间就有人动过了,虽然值是不变的,假如不检查版本号,CAS就直接能成功了
},"t1").start();
new Thread(() -> {
//暂停一会儿线程
try { Thread.sleep( 500 ); } catch (InterruptedException e) { e.printStackTrace(); };
System.out.println(atomicInteger.compareAndSet(100, 2022)+"\t"+atomicInteger.get());
},"t2").start();
//-------------------- true-2022
//暂停一会儿线程,main彻底等待上面的ABA出现演示完成。
try { Thread.sleep( 2000 ); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("============以下是ABA问题的解决=============================");
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName()+"\t 首次版本号:"+stamp);//1-----------初始获得一样的版本号
//暂停500毫秒,保证t4线程初始化拿到的版本号和我一样,
try { TimeUnit.MILLISECONDS.sleep( 500 ); } catch (InterruptedException e) { e.printStackTrace(); }
atomicStampedReference.compareAndSet(100,101,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println(Thread.currentThread().getName()+"\t 2次版本号:"+atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println(Thread.currentThread().getName()+"\t 3次版本号:"+atomicStampedReference.getStamp());
},"t3").start();
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();//记录一开始的版本号,并且写死
System.out.println(Thread.currentThread().getName()+"\t 首次版本号:"+stamp);//1------------初始获得一样的版本号
//暂停1秒钟线程,等待上面的t3线程,发生了ABA问题
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
boolean result = atomicStampedReference.compareAndSet(100,2019,stamp,stamp+1);//这个还是初始的版本号,但是实际上版本号被T3修改了,所以肯定会失败
System.out.println(Thread.currentThread().getName()+"\t"+result+"\t"+atomicStampedReference.getReference());
},"t4").start();
}
}
//t3 首次版本号:1
//t4 首次版本号:1
//t3 2次版本号:2
//t3 3次版本号:3
//false 100 3 -----因为版本号实际上已经被修改了
总结:版本号+比较要一起上
原子操作类
是什么
都是java.util.concurrent.atomic
包下的
有红框圈起来的,也有蓝框圈起来的,为什么?
阿里巴巴Java开发手册
为什么说18罗汉增强,却只有16个
再分类
基本类型原子类
- AtomicInteger
- AtomicBoolean
- AtomicLong
常用API
1 | public final int get() |
Case-CountDownLatch
案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27class MyNumber{
AtomicInteger atomicInteger = new AtomicInteger();
public void addPlusPlus(){
atomicInteger.getAndIncrement();
}
}
public class AtomicIntegerDemo {
public static final int SIZE = 50;
public static void main(String[] args) {
MyNumber myNumber = new MyNumber();
for(int i = 1;i <= SIZE;i ++){
new Thread(() -> {
for(int j = 1;j <= 1000;j ++){
myNumber.addPlusPlus();
}
},String.valueOf(i)).start();
}
System.out.println(Thread.currentThread().getName()+"\t"+"result: "+myNumber.atomicInteger);
}
}
//本来应该是50000
//1试-main result: 39000
//2试-main result: 40178
//?是不是我们的程序有问题?
//因为上面的50* 1000个计算还没结束,他就去get数值了解决
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43//方法一(不推荐,做做Demo还行)
public class AtomicIntegerDemo {
public static final int SIZE = 50;
public static void main(String[] args) {
MyNumber myNumber = new MyNumber();
for(int i = 1;i <= SIZE;i ++){
new Thread(() -> {
for(int j = 1;j <= 1000;j ++){
myNumber.addPlusPlus();
}
},String.valueOf(i)).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"\t"+"result: "+myNumber.atomicInteger);
}
}
//方法二-减法计数器CountDownLatch
public class AtomicIntegerDemo {
public static final int SIZE = 50;
public static void main(String[] args) throws InterruptedException {
MyNumber myNumber = new MyNumber();
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for(int i = 1;i <= SIZE;i ++){
new Thread(() -> {
try {
for(int j = 1;j <= 1000;j ++){
myNumber.addPlusPlus();
}
} finally {
countDownLatch.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t"+"result: "+myNumber.atomicInteger);
}
}
//main result: 50000
数组类型原子类
基本原理同上,不做过多演示
- AtomicIntegerArray
- AtomicLongArray
- AtomicRreferenceArray
Case
1 | public class AtomicIntegerArrayDemo |
引用类型原子类
这三个相对比较重要
AtomicReference
AtomicStampedReference
AtomicMarkableReference
AtomicReference
可以带泛型(前面讲过)AtomicReference<xxx>
AtomicStampedReference
带版本号以防CAS中的ABA问题(前面讲过)- 携带版本号的引用类型原子类,可以解决ABA问题。解决修改过几次的问题。*
AtomicMarkableReference
类似于上面的 ,但解决一次性问题- 构造法
AtomicMarkableReference(V initialRef, boolean initialMark)
- 原子更新带有标记位的引用类型对象
- 解决是否修改过,它的定义就是将
状态戳
简化为true|false
,类似一次性筷子
- 构造法
1 | public class AtomicMarkableReferenceDemo { |
对象的属性修改原子类
关键词FieldUpdater
- AtomicIntegerFieldUpdater//原子更新对象中int类型字段的值
- AtomicLongFieldUpdater//原子更新对象中Long类型字段的值
- AtomicReferenceFieldUpdater//原子更新引用类型字段的值
更加细粒度范围内的原子更新
使用目的
- 以一种线程安全方式操作非线程安全对象内的某些字段
举个例子(它是更加细粒度的/影像某个字段,而不用锁住整个对象)
使用要求
- 更新的对象属性必须使用public volatile修饰符
- 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法
newUpdater()
创建一个更新器,并且需要设置想要更新的类和属性。
Case
AtomicIntegerFieldUpdater
-这个针对int类型
1 | public class AtomicIntegerFieldUpdaterDemo { |
AtomicReferenceFieldUpdater
-适用度更广
1 | //比如这个案例中是针对boolean类型的 |
面试
面试官问你:你在哪里用了volatile?
在AtomicReferenceFieldUpdater中,因为是规定好的必须由volatile修饰的
还有的话之前我们在DCL单例中,也用了volatile保证了可见性
原子操作增强类原理深度解析
开篇的时候我们将原子类分为了红框和蓝框,这里就是蓝框的内容
这几个都是java8开始有的,前面的都是java5就有了
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
阿里要命题目
- 热点商品点赞计算器,点赞数加加统计,不要求实时精确
- 一个很大的List,里面都是int类型,如何实现加加,说说思路
模拟下点赞计数器,看看性能
要求:热点商品点赞计算器,点赞数加加统计,不要求实时精确
看看这个LongAccumulator
常用API
入门讲解
LongAdder只能用来计算加法 。且从零开始计算
LongAccumulator提供了自定义的函数操作 (利用lambda表达式)
1 | public class LongAdderAPIDemo { |
LongAdder高性能对比Code演示
1 | /** |
源码、原理分析
架构
LongAdder是Striped64的子类
1 | public class LongAdder extends Striped64 implements Serializable { |
原理(LongAdder为什么这么快)
官网说明
Striped64
重要的成员函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17//Number of CPUS, to place bound on table size
// CPU数量,即cells数组的最大长度
static final int NCPU = Runtime.getRuntime().availableProcessors();
//Table of cells. When non-null, size is a power of 2.
//单元格数组|cells数组,为2的幂,2,4,8,16.....,方便以后位运算
transient volatile Cell[] cells;
//基础value值,当并发较低时,只累加该值主要用于没有竞争的情况,通过CAS更新。
//Base value, used mainly when there is no contention, but also as
//a fallback during table initialization races. Updated via CAS.
transient volatile long base;
//创建或者扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁。
//Spinlock (locked via CAS) used when resizing and/or creating Cells.
transient volatile int cellsBusy;最重要的两个
Striperd64中一些变量或者方法的定义
Cell
是java.util.concurrent.atomic下Striped64的一个静态内部类
1 | static final class Cell { .misc.Contended |
LongAdder为什么这么快.
其实在小并发下情况差不多;但在高并发情况下,在
AtomicLong
中,等待的线程会不停的自旋,导致效率比较低;而LongAdder
用cell[]
分了几个块出来,最后统计总的结果值(base+所有的cell值),分散热点。举个形象的例子,火车站买火车票,
AtomicLong
只要一个窗口,其他人都在排队;而LongAdder
利用cell
开了多个卖票窗口,所以效率高了很多。
一句话
LongAdder的基本思路就是分散热点 ,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点 。
数学表达
- 内部有一个base变量,一个Cell[]数组。
- base变量:非竞态条件下,直接累加到该变量上
- Cell[]数组:竞态条件下,累加各个线程自己的槽Cell[i]中
源码解读深度分析
LongAdder
在无竞争的情况,跟AtomicLong
一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零分散热点的做法,从空间换时间,用一个数组 cells,将一个value拆分进这个数组cells。
多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。
LongAdder.increment()
add(1L)
1 | public class LongAdder extends Striped64 implements Serializable { |
uncontended代表没有冲突。
我们点进这个casBase发现他也是个CAS
1 | final boolean casBase(long cmp, long val) { |
一开始竞争小的时候CAS能成功,也就是casBase能成功,然后cells也是空的,所以不会进到循环
竞争大的时候,他会
Cell[] rs = new Cell[2];
新建两个cell, 此时≠ null ,条件满足了,进入循环。
然后这里还有一层循环,这里是多个if并排
- 总结一下
- 最初无竞争时只更新base;
- 如果更新base失败后,首次新建一个Cell[]数组
- 当多个线程竞争同一个Cell比价激烈时,可能就要利用
longAccumulate
对Cell[]扩容。
- 如果Cells表为空,尝试用CAS更新base字段,成功则退出:
- 如果Cells表为空,CAS更新base字段失败,出现竞争,uncontended为true,调用longAccumulate;
- 如果Cells表非空,但当前线程映射的槽为空,uncontended为true,调用longAccumulate;
- 如果Cells表非空,且前线程映射的槽非空,CAS更新Ce的值,成功则返回,否则,uncontended设为false,调用longAccumulate。
longAccumulate()
1 | final void longAccumulate(long x, LongBinaryOperator fn, |
LongAccumulate入参说明
Striped64中一些变量或者方法的定义
步骤
(a = as[getProbe() & m])
里的probe,这里得到了了hash值,通过hash值知道我们去到哪个cell槽1
2
3
4
5static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
//其实就是得到了线程的Hash值所以最前面的这一段就像是新员工入职获取工号(hash值)一样
总纲
上述代码首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:
- CASE1:cells[]数组已经初始化
- CASE2:cells[]数组未初始化(首次新建)
- CASE3:cells[]数组正在初始化中
计算
CASE2:刚刚要初始化cells[]数组(首次新建)
如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,
rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素,value是x值,默认为1。
h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思。CASE3:cells[]数组正在初始化中。兜底
多个线程尝试CAS修改失败的线程会走到这个分支
1
2
3
4
5// Fall back on using base
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
//该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。若一个线程cas操作为false,则继续自旋
CASE1:cells[]数组已经初始化且可能存在数组扩容
多个线程同时命中一个cell的竞争,这个是最复杂的部分
-
- 上面代码判断当前线程hash后指向的数据位置元素是否为空,
- 如果为空则将Cell数据放入数组中,跳出循环。
- 如果不空则继续循环。
-
说明当前线程对应的数组中有了数据,也重置过hash值,
这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环。
-
以上六步总结
sum()
1 | //LongAdder.java |
sum()会将所有Cell数组中的value和base累加作为返回值。
核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点 。
为啥在并发情况下sum的值不精确?
sum执行时,并没有限制对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性的,它是最终一致性的。
- 首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了 ,而此时局部变量sum不会更新,造成不一致。
- 其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果。
使用总结
- AtomicLong
- 线程安全,可允许一些性能损耗,要求高精度时可使用
- 保证精度,性能代价
- AtomicLong是多个线程针对单个热点值value进行原子操作
- LongAdder
- 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
- 保证性能,精度代价
- LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作
小总结
AtomicLong
原理
- CAS+自旋
- incrementAndGet
场景
- 低并发下的全局计算
- AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题
缺陷
- 高并发后性能急剧下降
- why?AtomicLong的自旋会称为瓶颈(N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。)
LongAdder
原理
- CAS+Base+Cell数组分散
- 空间换时间并分散了热点数据
场景
- 高并发的全局计算
缺陷
- sum求和后还有计算线程修改结果的话,最后结果不够准确