您的位置 首页 java

java并发线程深入理解CAS以及ABA问题的处理

一、什么是CAS

CAS(Compare And Swap,比较并交换),通常指的是这样一种 原子操作 :针对一个变量,首先比较它的内存值与某个期望值是否相同,如果相同,就给它赋一个新值。

CAS实现过程如下图

1、一个初始值变量V,值为5;一开始先读取V实际内存中的值赋值给E
2、比如我们需要给最原始的V+1操作,那么此时用E+1来进行操作(这是防止V在其他线程已经被改变),这样完成了U=E+1的操作
3、判断E和V的值是否一致,如果一致则证明在以上操作过程中V没有被其他 线程 改变则将U的值赋值给V,如果不一致那V就被其他改变了,这样给U的+1操作就不成立,返回当前的V。

CAS 的逻辑用伪代码描述如下:

 if (value == expectedValue) {     
    value = newValue;  

以上 伪代码 描述了一个由比较和赋值两阶段组成的复合操作,CAS 可以看作是它们合并后的整体——一个不可分割的原子操作,并且其原子性是直接在硬件层面得到保障的。

CAS可以看做是乐观锁(对比数据库的悲观、 乐观锁 )的一种实现方式, Java 原子类中的递增操作就通过CAS自旋实现的。

CAS是一种无锁算法,在不使用锁(没有线程被阻塞)的情况下实现 多线程 之间的变量同步。

1-2、CAS应用

在java并发线程中,我们可以使用以下方式进行加锁处理:

1、synchronize 在并发竞争比较激烈的时候不推荐使用,会让未获得锁的线程阻塞,因为会切换到内核态,进行park,这样就会有性能问题
2、使用Reentrant lock lock.lock(加锁) lock.unlock(解锁),需要注意的是unlock需要放到finally中,防止代码块异常,抛出后锁一直存在。
3、CAS也可以实现线程锁机制。

在 Java 中,CAS 操作是由 Unsafe 类提供支持的,该类定义了三种针对不同类型变量的 CAS 操作,如图

它们都是 native 方法,由 Java 虚拟机 提供具体实现,这意味着不同的 Java 虚拟机对它们的实现可能会略有不同。

以 compareAndSwapInt 为例,Unsafe 的 compareAndSwapInt 方法接收 4 个参数,分别是:对象实例、内存偏移量、字段期望值、字段新值。该方法会针对指定对象实例中的相应偏移量的字段执行 CAS 操作。

1-3、并发锁的实现

1-3-1、使用 synchronized

我们都知道用synchronized可以实现锁机制,但是在并发量大的时候,会**锁竞争的现象,这样就会涉及到用户态到内核态的切换**,这是极其耗费性能的,因此建议在并发量不大的情况下,可以使用synchronized进行加锁。并发量大的时候,不建议使用

 public class ThreadLockSynchronized {
     private  volatile  static  int count = 0;

    static Object object = "";

    public static  void  main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (object) {
                        for (int j = 0; j < 10000; j++) {
                            count++;
                        }
                    }
                }
            });
            thread.start();
        }
        try {
            Thread.sleep(1000);
        } catch (Interrupted Exception  e) {
            e.printStackTrace();
        }
        System.out.println("count:" + count);
    }
}  

以上代码测试结果如下:

1-3-2、使用 reentrant Lock

使用ReentrantLock的时候一定要注意, 要将unlock()放到finally代码块中,防止业务代码异常,无法释放锁

 public class ThreadLockReentrantLock {
    private volatile static int count = 0;

    static ReentrantLock reentrantLock=new ReentrantLock();

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    //使用reentrantLock.lock();进行加锁操作
                    reentrantLock.lock();
                    try {
                        for (int j = 0; j < 10000; j++) {
                            count++;
                        }
                    } finally {
                        //reentrantLock.unlock();一定要放到finally中,防止业务代码异常,导致锁不释放
                        reentrantLock.unlock();
                    }
                }
            });
            thread.start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("count:" + count);
    }
}  

1-3-3、使用CAS

1-3-3-1、通过反射获得Unsafe

Unsafe是 jdk 提供的工具类,我们要使用需要通过 反射机制 取到,同时获得其获取偏移量的操作

 public class UnsafeFactory {
    /**
     * 获取 Unsafe 对象
     * @return
     */    public static Unsafe getUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 获取字段的内存偏移量
     * @param unsafe
     * @param clazz
     * @param fieldName
     * @return
     */    public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
        try {
            return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }
}  

1-3-3-2、通过CAS实现对变量的修改

首先定义一个对象Entity,其中定义一个int类型变量X,然后通过CAS对X进行修改。

 public class CASTest {
    public static void main(String[] args) {
        Entity entity = new Entity();

        //通过反射机制获得Unsafe
        Unsafe unsafe = UnsafeFactory.getUnsafe();

        //获得x内存中的偏移量
        long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");
        System.out.println(offset);
         boolean  successful;

        // 4个参数分别是:对象实例、字段的内存偏移量、字段原值、字段更新值
        //通过CAS将x由0改为3
        successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
        System.out.println(successful + "t" + entity.x);

        //通过CAS将x由3改为5
        successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
        System.out.println(successful + "t" + entity.x);

        //通过CAS将x由3改为8--本条是不成立的,上面已经将x改为5
        successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
        System.out.println(successful + "t" + entity.x);

    }

}

class Entity{
    int x;
}  

运行结果如下:

首先获得X的偏移量为12,其次就是对x进行修改的结果打印。第三次修改失败,因为第二次已经将x改为5,如果在用第一次的3作为原值去修改,就会修改失败。

java并发线程深入理解CAS以及ABA问题的处理

1-3-3-3、CAS特点

根据以上执行结果,就证明了CAS的特点: 先比较、后更新 ,这两步,底层会帮助我们实现 原子操作,有序性和可见性 避免上线文切换。

1-4、CAS源码分析

1-4-1、java层代码

Unsafe类中提供了三种CAS操作如下,这三种都是native方法,都是 Hotspot 代码,

java并发线程深入理解CAS以及ABA问题的处理

下面用compareAndSwapInt方法来举例说明

 public final native boolean compareAndSwapInt(Object var1,  long  var2, int var4, int var5);  

方法参数:

1、对象实例
2、字段内存值的偏移量(根据对象实例和偏移量就可以获得对应的变量)
3、期望对象实例中的原值
4、字段更新值

1-4-2、Hotspot层

1-4-2-1、Unsafe_CompareAndSwapInt方法

调用Hotspot方法源码如下:

 #unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 根据偏移量,计算value的地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// Atomic::cmpxchg(x, addr, e) cas逻辑 x:要交换的值   e:要比较的值
//cas成功,返回期望值e,等于e,此方法返回true 
//cas失败,返回内存中的value值,不等于e,此方法返回false
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;  

1-4-2-1-1、方法解析

首先调用的方法

Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)

前面两个值是hotspot传入的值,后面四个值为java方法传入进来的。

其次根据偏移量,计算value的地址

 //获得对象
oop p = JNIHandles::resolve(obj); 
// 根据偏移量,计算value的地址 
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);  

然后调用Atomic::cmpxchg实现CAS操作

 // Atomic::cmpxchg(x, addr, e) cas逻辑 x:要交换的值   e:要比较的值
//cas成功,返回期望值e,等于e,此方法返回true 
//cas失败,返回内存中的value值,不等于e,此方法返回false
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;  

1-4-2-2、Atomic::cmpxchg方法

需要注意下面的代码是Linux_x86,不同系统处理CAS是不同的

 #atomic_linux_x86.inline.hpp
inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
//判断当前执行环境是否为多处理器环境
int mp = os::is_MP();
//LOCK_IF_MP(%4) 在多处理器环境下,为 cmpxchgl 指令添加 lock 前缀,以达到内存屏障的效果
//cmpxchgl 指令是包含在 x86 架构及 IA-64 架构中的一个原子条件指令,
//它会首先比较 dest 指针指向的内存值是否和 compare_value 的值相等,
//如果相等,则双向交换 dest 与 exchange_value,否则就单方面地将 dest 指向的内存值交给exchange_value。
//这条指令完成了整个 CAS 操作,因此它也被称为 CAS 指令。
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", " memory ");
return exchange_value;          

现代处理器指令集架构基本上都会提供 CAS 指令,例如 x86 和 IA-64 架构中的 cmpxchgl 指令和 comxchgq 指令,sparc 架构中的 cas 指令和 casx 指令。

不管是 Hotspot 中的 Atomic::cmpxchg 方法,还是 Java 中的 compareAndSwapInt 方法,它们本质上都是对相应平台的 CAS 指令的一层简单封装。CAS 指令作为一种硬件原语,有着天然的原子性,这也正是 CAS 的价值所在。

1-5、通过CAS实现锁

我们再回到一开始的代码中,该如何实现锁呢

 public class ThreadLockCAS {
    private volatile static int count = 0;
    
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 10000; j++) {
                        count++;
                    }
                }
            });
            thread.start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("count:" + count);
    }
}  

1-5-1、实现思路

通过上对CAS的理解,我们就可以这么操作:

1、设置一个中间值为0
2、第一个线程进来的时候通过CAS将中间值改为1
3、其他线程进来 再次准备将0改为1的时候就会失败
4、第一个线程业务操作完毕,再次通过cas将中间值改为0
5、下一个线程再次通过CAS将0改为1就会成功,获得锁

1-5-2、实现一个CAS比较与交换的帮助类

1-5-2-1、实现加锁的类

其中UnsafeFactory类的代码在上面已经贴过,这个地方就不再贴了。这个类中主要就是cas()这个方法。这就是CAS方式加锁的操作。

State 就是作为改变交换的值。

 public class CASLock {
    private volatile int state;
    private static final Unsafe UNSAFE;
    private static final long OFFSET;

    static {
        try {
            UNSAFE= UnsafeFactory.getUnsafe();
            OFFSET=UnsafeFactory.getFieldOffset(UNSAFE,CASLock.class,"state");
        } catch (Exception e) {
            throw  new Error(e);
        }
    }
    //设置CAS操作
    public boolean cas(){
        return UNSAFE.compareAndSwapInt(this,OFFSET,0,1);
    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }
}  

1-5-2-2、使用锁实现

如下代码,在第一个 for循环 中创建线程,然后线程中实现了一个自旋操作,这样第一个线程进入的锁之后,其他线程都在进行自旋操作,等第一个线程通过casLock.setState(0),释放锁的时候,下一个线程casLock.getState()==0 && casLock.cas()就可以成立。

 public class ThreadLockCAS {
    private volatile static int count = 0;
    static CASLock casLock = new CASLock();

    public static void main(String[] args) {

        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                for (; ; ) {
                    //state=0
                    if (casLock.getState() == 0 && casLock.cas()) {
                        try {
                            for (int j = 0; j < 10000; j++) {
                                count++;
                            }
                        } finally {
                            casLock.setState(0);
                        }
                        break;
                    }
                }
            });
            thread.start();
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count);
    }
}  

虽然通过以上代码实现了加锁操作,由于使用了自旋操作,这样等待的线程就会一直空转,消耗CPU资源,显然这样得实现方式是不太友好的

1-6、CAS缺陷

CAS 虽然高效地解决了原子操作,但是还是存在一些缺陷的,主要表现在三个方面:

  • 自旋 CAS 长时间地不成功,则会给 CPU 带来非常大的开销
  • 只能保证一个共享变量原子操作
  • ABA 问题

1-6-1、ABA问题及解决方案

CAS算法实现一个重要前提需要取出内存中某时刻的数据,而在下时刻比较并替换,那么在这个时间差类会导致数据的变化。

1-6-1-1、什么是ABA问题

当有多个线程对一个原子类进行操作的时候,某个线程在短时间内将原子类的值A修改为B,又马上将其修改为A,此时其他线程不感知,还是会修改成功。

java并发线程深入理解CAS以及ABA问题的处理

测试ABA问题

代码运行过程:
第一个线程要从1改为3,进入线程之后等待1秒。
第二个线程将1改为2,然后又将2改为1
第一个线程从1改为3,(因为原值还是1,这样就修改成功了)- 实际上此1非彼1

 public class ABATest {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        
        new Thread(() -> {
            int value = atomicInteger.get();
            System.out.println("Thread1 read value: " + value);

            // 阻塞1s
            LockSupport.parkNanos(1000000000L);

            // Thread1通过CAS修改value值为3
            if (atomicInteger.compareAndSet(value, 3)) {
                System.out.println("Thread1 update from " + value + " to 3");
            } else {
                System.out.println("Thread1 update fail!");
            }
        }, "Thread1").start();

        new Thread(() -> {
            int value = atomicInteger.get();
            System.out.println("Thread2 read value: " + value);
            // Thread2通过CAS修改value值为2
            if (atomicInteger.compareAndSet(value, 2)) {
                System.out.println("Thread2 update from " + value + " to 2");

                // do something
                value = atomicInteger.get();
                System.out.println("Thread2 read value: " + value);
                // Thread2通过CAS修改value值为1
                if (atomicInteger.compareAndSet(value, 1)) {
                    System.out.println("Thread2 update from " + value + " to 1");
                }
            }
        }, "Thread2").start();
    }
}  

运行结果如下:

Thread1不清楚Thread2对value的操作,误以为value=1没有修改过

1-6-1-2、ABA问题的解决方案

数据库有个锁称为乐观锁,是一种基于数据版本实现数据同步的机制,每次修改一次数据,版本就会进行累加。

AtomicStampedReference解决CAS的ABA问题

同样,Java也提供了相应的原子引用类AtomicStampedReference

reference即我们实际存储的变量,stamp是版本,每次修改可以通过+1保证版本唯一性。这样就可以保证每次修改后的版本也会往上递增。

AtomicMarkableReference解决CAS的ABA问题

可以理解为上面AtomicStampedReference的简化版,就是不关心修改过几次,仅仅关心是否修改过。因此变量mark是boolean类型,仅记录值是否有过修改。

使用AtomicStampedReference,利用版本号解决ABA问题

 public class AtomicStampedReferenceTest {
    public static void main(String[] args) {
        // 定义AtomicStampedReference    Pair.reference值为1, Pair.stamp为1
        AtomicStampedReference atomicStampedReference = new AtomicStampedReference(1,1);

        new Thread(()->{
            int[] stampHolder = new int[1];
            int value = (int) atomicStampedReference.get(stampHolder);
            int stamp = stampHolder[0];
            System.out.println("Thread1 read value: " + value + ", stamp: " + stamp);

            // 阻塞1s
            LockSupport.parkNanos(1000000000L);
            // Thread1通过CAS修改value值为3   stamp是版本,每次修改可以通过+1保证版本唯一性
            if (atomicStampedReference.compareAndSet(value, 3,stamp,stamp+1)) {
                System.out.println("Thread1 update from " + value + " to 3");
            } else {
                System.out.println("Thread1 update fail!");
            }
        },"Thread1").start();

        new Thread(()->{
            int[] stampHolder = new int[1];
            int value = (int)atomicStampedReference.get(stampHolder);
            int stamp = stampHolder[0];
            System.out.println("Thread2 read value: " + value+ ", stamp: " + stamp);
            // Thread2通过CAS修改value值为2
            if (atomicStampedReference.compareAndSet(value, 2,stamp,stamp+1)) {
                System.out.println("Thread2 update from " + value + " to 2");

                // do something

                value = (int) atomicStampedReference.get(stampHolder);
                stamp = stampHolder[0];
                System.out.println("Thread2 read value: " + value+ ", stamp: " + stamp);
                // Thread2通过CAS修改value值为1
                if (atomicStampedReference.compareAndSet(value, 1,stamp,stamp+1)) {
                    System.out.println("Thread2 update from " + value + " to 1");
                }
            }
        },"Thread2").start();
    }
}  

执行结果

java并发线程深入理解CAS以及ABA问题的处理

总结:

本篇文章主要讲述CAS的加锁的实现方式,以及通过使用AtomicStampedRefrence和AtomicMarkableReference解决ABA的问题

作者:Jony_zhang
链接:

文章来源:智云一二三科技

文章标题:java并发线程深入理解CAS以及ABA问题的处理

文章地址:https://www.zhihuclub.com/183027.shtml

关于作者: 智云科技

热门文章

网站地图