博主
258
258
258
258
专辑

第十节 CAS简述

亮子 2021-10-14 14:04:11 6885 0 0 0

01 CAS简介

首先我们来看一下 CAS 是什么,它的英文全称是 Compare-And-Swap,中文叫做“比较并交换”,它是一种思想、一种算法。

在多线程的情况下,各个代码的执行顺序是不能确定的,所以为了保证并发安全,我们可以使用互斥锁。而 CAS 的特点是避免使用互斥锁,当多个线程同时使用 CAS 更新同一个变量时,只有其中一个线程能够操作成功,而其他线程都会更新失败。不过和同步互斥锁不同的是,更新失败的线程并不会被阻塞,而是被告知这次由于竞争而导致的操作失败,但还可以再次尝试。

CAS 被广泛应用在并发编程领域中,以实现那些不会被打断的数据交换操作,从而就实现了无锁的线程安全。

02 CAS的思路

在大多数处理器的指令中,都会实现 CAS 相关的指令,这一条指令就可以完成“比较并交换”的操作,也正是由于这是一条(而不是多条)CPU 指令,所以 CAS 相关的指令是具备原子性的,这个组合操作在执行期间不会被打断,这样就能保证并发安全。由于这个原子性是由 CPU 保证的,所以无需我们程序员来操心。

CAS 有三个操作数:内存值 V、预期值 A、要修改的值 B。CAS 最核心的思路就是,仅当预期值 A 和当前的内存值 V 相同时,才将内存值修改为 B。

我们对此展开描述一下:CAS 会提前假定当前内存值 V 应该等于值 A,而值 A 往往是之前读取到当时的内存值 V。在执行 CAS 时,如果发现当前的内存值 V 恰好是值 A 的话,那 CAS 就会把内存值 V 改成值 B,而值 B 往往是在拿到值 A 后,在值 A 的基础上经过计算而得到的。如果执行 CAS 时发现此时内存值 V 不等于值 A,则说明在刚才计算 B 的期间内,内存值已经被其他线程修改过了,那么本次 CAS 就不应该再修改了,可以避免多人同时修改导致出错。这就是 CAS 的主要思路和流程。

JDK 正是利用了这些 CAS 指令,可以实现并发的数据结构,比如 AtomicInteger 等原子类。

利用 CAS 实现的无锁算法,就像我们谈判的时候,用一种非常乐观的方式去协商,彼此之间很友好,这次没谈成,还可以重试。CAS 的思路和之前的互斥锁是两种完全不同的思路,如果是互斥锁,不存在协商机制,大家都会尝试抢占资源,如果抢到了,在操作完成前,会把这个资源牢牢的攥在自己的手里。当然,利用 CAS 和利用互斥锁,都可以保证并发安全,它们是实现同一目标的不同手段。

例子
下面我们用图解和例子的方式,让 CAS 的过程变得更加清晰,如下图所示:

图片alt

假设有两个线程,分别使用两个 CPU,它们都想利用 CAS 来改变右边的变量的值。我们先来看线程 1,它使用 CPU 1,假设它先执行,它期望当前的值是 100,并且想将其改成 150。在执行的时候,它会去检查当前的值是不是 100,发现真的是 100,所以可以改动成功,而当改完之后,右边的值就会从 100 变成 150。

图片alt

如上图所示,假设现在才刚刚轮到线程 2 所使用的 CPU 2 来执行,它想要把这个值从 100 改成 200,所以它也希望当前值是 100,可实际上当前值是 150,所以它会发现当前值不是自己期望的值,所以并不会真正的去继续把 100 改成 200,也就是说整个操作都是没有效果的,此次没有修改成功,CAS 操作失败。

当然,接下来线程 2 还可以有其他的操作,这需要根据业务需求来决定,比如重试、报错或者干脆跳过执行。举一个例子,在秒杀场景下,多个线程同时执行秒杀,只要有一个执行成功就够了,剩下的线程当发现自己 CAS 失败了,其实说明兄弟线程执行成功了,也就没有必要继续执行了,这就是跳过操作。所以业务逻辑不同,就会有不同的处理方法,但无论后续怎么处理,之前的那一次 CAS 操作是已经失败了的。

03 CAS的语义

我们来看一看 CAS 的语义,有了下面的等价代码之后,理解起来会比前面的图示和文字更加容易,因为代码实际上是一目了然的。接下来我们把 CAS 拆开,看看它内部究竟做了哪些事情。CAS 的等价语义的代码,如下所示

/**
 * 描述:     模拟CAS操作,等价代码
 */
 
public class SimulatedCAS {

    private int value;

    public synchronized int compareAndSwap(int expectedValue, int newValue) {
        int oldValue = value;
        if (oldValue == expectedValue) {
            value = newValue;
        }
        return oldValue;
    }
}

在这段代码中有一个 compareAndSwap 方法,在这个方法里有两个入参,第 1 个入参期望值 expectedValue,第 2 个入参是 newValue,它就是我们计算好的新的值,我们希望把这个新的值去更新到变量上去。

你一定注意到了, compareAndSwap 方法是被 synchronized 修饰的,我们用同步方法为 CAS 的等价代码保证了原子性。

接下来我将讲解,在 compareAndSwap 方法里都做了哪些事情。需要先拿到变量的当前值,所以代码里用就会用 int oldValue = value 把变量的当前值拿到。然后就是 compare,也就是“比较”,所以此时会用 if (oldValue == expectedValue) 把当前值和期望值进行比较,如果它们是相等的话,那就意味着现在的值正好就是我们所期望的值,满足条件,说明此时可以进行 swap,也就是交换,所以就把 value 的值修改成 newValue,最后再返回 oldValue,完成了整个 CAS 过程。

CAS 最核心的思想就在上面这个流程中体现了,可以看出,compare 指的就是 if 里的比较,比较 oldValue 是否等于 expectedValue;同样,swap 实际上就是把 value 改成 newValue,并且返回 oldValue。所以这整个 compareAndSwap 方法就还原了 CAS 的语义,也象征了 CAS 指令在背后所做的工作。

测试代码:

/**
 * @author WGR
 * @create 2021/1/6 -- 22:59
 */
public class TestAccount {
    public static void main(String[] args) {
        Account account = new AccountUnsafe(10000);
        Account.demo(account);
    }
}

class AccountCas implements Account {
    private AtomicInteger balance;

    public AccountCas(int balance) {
        this.balance = new AtomicInteger(balance);
    }

    @Override
    public Integer getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(Integer amount) {
        /*while(true) {
            // 获取余额的最新值
            int prev = balance.get();
            // 要修改的余额
            int next = prev - amount;
            // 真正修改
            if(balance.compareAndSet(prev, next)) {
                break;
            }
        }*/
        balance.getAndAdd(-1 * amount);
    }
}

class AccountUnsafe implements Account {

    private Integer balance;

    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }

    @Override
    public Integer getBalance() {
       // synchronized (this) {
            return this.balance;
      //  }
    }

    @Override
    public void withdraw(Integer amount) {
    //    synchronized (this) {
            this.balance -= amount;
      //  }
    }
}

interface Account {
    // 获取余额
    Integer getBalance();

    // 取款
    void withdraw(Integer amount);

    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(Account account) {
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        long start = System.nanoTime();
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance()
                + " cost: " + (end-start)/1000_000 + " ms");
    }
}

如果不加锁,会出现并发问题

图片alt

进行先编译查看,其实是由多条指令的,多线程操作不能保证它的原子性。

图片alt

解决方法都在刚刚的图中,要不加锁,要不用原子类。

  • volatile
    获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。

注意

volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原子性)CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

图片alt

04 为什么无锁效率高

无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻
线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

图片alt

05 CAS的特点

  • 结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  • CAS 体现的是无锁并发、无阻塞并发,因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一,但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

06 原子类

001 原子整数

AtomicBoolean,AtomicInteger,AtomicLong
以 AtomicInteger 为例

/**
 * @author WGR
 * @create 2021/1/6 -- 23:20
 */
public class Test34 {
    public static void main(String[] args) {

        AtomicInteger i = new AtomicInteger(5);

        /*System.out.println(i.incrementAndGet()); // ++i   1
        System.out.println(i.getAndIncrement()); // i++   2

        System.out.println(i.getAndAdd(5)); // 2 , 7
        System.out.println(i.addAndGet(5)); // 12, 12*/

        //             读取到    设置值
//        i.updateAndGet(value -> value * 10);

        System.out.println(updateAndGet(i, p -> p / 2));

//        i.getAndUpdate()
        System.out.println(i.get());
    }

    public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator) {
        while (true) {
            int prev = i.get();
            int next = operator.applyAsInt(prev);
            if (i.compareAndSet(prev, next)) {
                return next;
            }
        }
    }
}

002 原子引用:ABA问题

AtomicReference,AtomicMarkableReference,AtomicStampedReference

/**
 * @author WGR
 * @create 2021/1/6 -- 23:33
 */
@Slf4j(topic = "c.Test35")
public class Test35 {

    static AtomicReference<String> ref = new AtomicReference<>("A");

    public static void main(String[] args) {
        String prev = ref.get();
        other();
        sleep(1);
        log.debug("change A->C {}", ref.compareAndSet(ref.get(), "C"));
    }

    private static void other() {
        new Thread(() -> {
            log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
        }, "t2").start();
    }
}

图片alt

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:
只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号

  • 解决ABA问题
/**
 * @author WGR
 * @create 2021/1/6 -- 23:31
 */
@Slf4j(topic = "c.Test36")
public class Test36 {

    static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);

    public static void main(String[] args) throws InterruptedException {
        log.debug("main start...");
        // 获取值 A
        String prev = ref.getReference();
        // 获取版本号
        int stamp = ref.getStamp();
        log.debug("版本 {}", stamp);
        // 如果中间有其它线程干扰,发生了 ABA 现象
        other();
        sleep(1);
        // 尝试改为 C
        log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
    }

    private static void other() {
        new Thread(() -> {
            log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));
            log.debug("更新版本为 {}", ref.getStamp());
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));
            log.debug("更新版本为 {}", ref.getStamp());
        }, "t2").start();
    }
}

图片alt

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A ->C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference

/**
 * @author WGR
 * @create 2021/1/6 -- 23:48
 */
@Slf4j(topic = "c.Test38")
public class Test38 {
    public static void main(String[] args) throws InterruptedException {
        GarbageBag bag = new GarbageBag("装满了垃圾");
        // 参数2 mark 可以看作一个标记,表示垃圾袋满了
        AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);

        log.debug("start...");
        GarbageBag prev = ref.getReference();
        log.debug(prev.toString());

        new Thread(() -> {
            log.debug("start...");
            bag.setDesc("空垃圾袋");
            ref.compareAndSet(bag, bag, true, false);
            log.debug(bag.toString());
        },"保洁阿姨").start();

        sleep(1);
        log.debug("想换一只新垃圾袋?");
        boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
        log.debug("换了么?" + success);
        log.debug(ref.getReference().toString());
    }
}

class GarbageBag {
    String desc;

    public GarbageBag(String desc) {
        this.desc = desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return super.toString() + " " + desc;
    }
}

图片alt

003 原子数组

AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray

/**
 * @author WGR
 * @create 2021/1/6 -- 23:57
 */
public class Test39 {

    public static void main(String[] args) {
        demo(
                ()->new int[10],
                (array)->array.length,
                (array, index) -> array[index]++,
                array-> System.out.println(Arrays.toString(array))
        );

        demo(
                ()-> new AtomicIntegerArray(10),
                (array) -> array.length(),
                (array, index) -> array.getAndIncrement(index),
                array -> System.out.println(array)
        );
    }

    /**
     参数1,提供数组、可以是线程不安全数组或线程安全数组
     参数2,获取数组长度的方法
     参数3,自增方法,回传 array, index
     参数4,打印数组的方法
     */
    // supplier 提供者 无中生有  ()->结果
    // function 函数   一个参数一个结果   (参数)->结果  ,  BiFunction (参数1,参数2)->结果
    // consumer 消费者 一个参数没结果  (参数)->void,      BiConsumer (参数1,参数2)->
    private static <T> void demo(
            Supplier<T> arraySupplier,
            Function<T, Integer> lengthFun,
            BiConsumer<T, Integer> putConsumer,
            Consumer<T> printConsumer ) {
        List<Thread> ts = new ArrayList<>();
        T array = arraySupplier.get();
        int length = lengthFun.apply(array);
        for (int i = 0; i < length; i++) {
            // 每个线程对数组作 10000 次操作
            ts.add(new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    putConsumer.accept(array, j%length);
                }
            }));
        }

        ts.forEach(t -> t.start()); // 启动所有线程
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });     // 等所有线程结束
        printConsumer.accept(array);
    }
}

图片alt

004 字段更新器

AtomicReferenceFieldUpdater,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现
异常

/**
 * @author WGR
 * @create 2021/1/7 -- 0:07
 */
@Slf4j(topic = "c.Test40")
public class Test40 {

    public static void main(String[] args) {
        Student stu = new Student();

        AtomicReferenceFieldUpdater updater =
                AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");

        System.out.println(updater.compareAndSet(stu, null, "张三"));
        System.out.println(stu);
    }
}

class Student {
    volatile String name;

    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                '}';
    }
}

如果不加volatile,就会报错。

图片alt

005 原子累加器

/**
 * @author WGR
 * @create 2021/1/7 -- 0:15
 */
public class Test41 {
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            demo(
                    () -> new AtomicLong(0),
                    (adder) -> adder.getAndIncrement()
            );
        }

        for (int i = 0; i < 5; i++) {
            demo(
                    () -> new LongAdder(),
                    adder -> adder.increment()
            );
        }
    }

    /*
    () -> 结果    提供累加器对象
    (参数) ->     执行累加操作
     */
    private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
        T adder = adderSupplier.get();
        List<Thread> ts = new ArrayList<>();
        // 4 个线程,每人累加 50 万
        for (int i = 0; i < 4; i++) {
            ts.add(new Thread(() -> {
                for (int j = 0; j < 500000; j++) {
                    action.accept(adder);
                }
            }));
        }
        long start = System.nanoTime();
        ts.forEach(t -> t.start());
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        long end = System.nanoTime();
        System.out.println(adder + " cost:" + (end - start) / 1000_000);
    }
}

图片alt

性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

006 LongAdder

LongAdder 类有几个关键域

// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;

007 伪共享

其中 Cell 即为累加单元

// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}

图片alt

图片alt

因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效。

图片alt

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。这样问题来了:Core-0 要修改 Cell[0],Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加Cell[0]=6001, Cell[1]=8000,这时会让 Core-1 的缓存行失效。
@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效

图片alt

public void add(long x) {
// as 为累加单元数组
// b 为基础值
// x 为累加值
        Cell[] as; long b, v; int m; Cell a;
// 进入 if 的两个条件
// 1. as 有值, 表示已经发生过竞争, 进入 if
// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
        if ((as = cells) != null || !casBase(b = base, b + x)) {
// uncontended 表示 cell 没有竞争
            boolean uncontended = true;
            if (
// as 还没有创建
                    as == null || (m = as.length - 1) < 0 ||
// 当前线程对应的 cell 还没有
                            (a = as[getProbe() & m]) == null ||
// cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )
                            !(uncontended = a.cas(v = a.value, v + x))
            ) {
// 进入 cell 数组创建、cell 创建的流程
                longAccumulate(x, null, uncontended);
            }
        }
    }

图片alt

    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
// 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell
        if ((h = getProbe()) == 0) {
// 初始化 probe
            ThreadLocalRandom.current();
// h 对应新的 probe 值, 用来对应 cell
            h = getProbe();
            wasUncontended = true;
        }
// collide 为 true 表示需要扩容
        boolean collide = false;
        for (;;) {
            Cell[] as; Cell a; int n; long v;
// 已经有了 cells
            if ((as = cells) != null && (n = as.length) > 0) {
// 还没有 cell
                if ((a = as[(n - 1) & h]) == null) {
// 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x
// 成功则 break, 否则继续 continue 循环
                }
// 有竞争, 改变线程对应的 cell 来重试 cas
                else if (!wasUncontended)
                    wasUncontended = true;
// cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null
                else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                    break;
// 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 cas
                else if (n >= NCPU || cells != as)
                    collide = false;
// 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
                else if (!collide)
                    collide = true;
// 加锁
                else if (cellsBusy == 0 && casCellsBusy()) {
// 加锁成功, 扩容
                    continue;
                }
// 改变线程对应的 cell
                h = advanceProbe(h);
            }
// 还没有 cells, 尝试给 cellsBusy 加锁
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell
// 成功则 break;
            }
// 上两种情况失败, 尝试给 base 累加
            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                break;
        }
    }

图片alt

图片alt

每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)

图片alt

008 Unsafe

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得

/**
 * @author WGR
 * @create 2021/1/7 -- 10:37
 */
public class TestUnsafe {

    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
        theUnsafe.setAccessible(true);
        Unsafe unsafe = (Unsafe) theUnsafe.get(null);

        System.out.println(unsafe);

        // 1. 获取域的偏移地址
        long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
        long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));

        Teacher t = new Teacher();
        // 2. 执行 cas 操作
        unsafe.compareAndSwapInt(t, idOffset, 0, 1);
        unsafe.compareAndSwapObject(t, nameOffset, null, "张三");

        // 3. 验证
        System.out.println(t);
    }
}
@Data
class Teacher {
    volatile int id;
    volatile String name;
}

图片alt

/**
 * @author WGR
 * @create 2021/1/7 -- 10:47
 */
@Slf4j(topic = "c.Test42")
public class Test42 {
    public static void main(String[] args) {
        Account.demo(new MyAtomicInteger(10000));
    }
}

class MyAtomicInteger implements Account {
    private volatile int value;
    private static final long valueOffset;
    private static final Unsafe UNSAFE;
    static {
        UNSAFE = UnsafeAccessor.getUnsafe();
        try {
            valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public int getValue() {
        return value;
    }

    public void decrement(int amount) {
        while(true) {
            int prev = this.value;
            int next = prev - amount;
            if (UNSAFE.compareAndSwapInt(this, valueOffset, prev, next)) {
                break;
            }
        }
    }

    public MyAtomicInteger(int value) {
        this.value = value;
    }

    @Override
    public Integer getBalance() {
        return getValue();
    }

    @Override
    public void withdraw(Integer amount) {
        decrement(amount);
    }
}

图片alt

07 CAS缺点

除了上面说的ABA问题,还有下面2点。

  • 自旋时间过长
    CAS 的第二个缺点就是自旋时间过长。

由于单次 CAS 不一定能执行成功,所以 CAS 往往是配合着循环来实现的,有的时候甚至是死循环,不停地进行重试,直到线程竞争不激烈的时候,才能修改成功。

可是如果我们的应用场景本身就是高并发的场景,就有可能导致 CAS 一直都操作不成功,这样的话,循环时间就会越来越长。而且在此期间,CPU 资源也是一直在被消耗的,这会对性能产生很大的影响。所以这就要求我们,要根据实际情况来选择是否使用 CAS,在高并发的场景下,通常 CAS 的效率是不高的。

  • 范围不能灵活控制
    CAS 的第三个缺点就是不能灵活控制线程安全的范围。

通常我们去执行 CAS 的时候,是针对某一个,而不是多个共享变量的,这个变量可能是 Integer 类型,也有可能是 Long 类型、对象类型等等,但是我们不能针对多个共享变量同时进行 CAS 操作,因为这多个变量之间是独立的,简单的把原子操作组合到一起,并不具备原子性。因此如果我们想对多个对象同时进行 CAS 操作并想保证线程安全的话,是比较困难的。

有一个解决方案,那就是利用一个新的类,来整合刚才这一组共享变量,这个新的类中的多个成员变量就是刚才的那多个共享变量,然后再利用 atomic 包中的 AtomicReference 来把这个新对象整体进行 CAS 操作,这样就可以保证线程安全。

参考文档