请看如下代码:
public class TestAccount {
public static void main(String[] args) {
Account account = new AccountCas(10000);
Account.demo(account);
}
}
class AccountCas implements Account{
private AtomicInteger balance;
public AccountCas(int balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
while (true) {
int prev = balance.get();
int next = prev - amount;
if(balance.compareAndSet(prev, next)) {
break;
}
}
}
}
interface Account {
Integer getBalance();
void withdraw(Integer amount);
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
for(int i=0; i<1000; i++) {
ts.add(new Thread(()->{
account.withdraw(10);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
}catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(account.getBalance());
}
}
以上解决并发问题并没有使用锁来保护变量的线程安全。其中的compareAndSet,简称是CAS,它必须是原子操作。
在AtomicInteger源码中,我们可以看到使用了volatile来修饰属性
private volatile int value;
这是因为CAS必须借助volatile才能读取到共享变量的最新值来实现【比较并交换】的效果
为什么用CAS的效率高呢?在无锁情况下,即使重试失败,线程始终在高速运行,而synchronized会让线程在没有获得锁的情况下,发生上下文切换,进入阻塞。但是在无锁情况下,因为线程要保持运行,需要额外CPU的支持,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。
因为没有使用synchronized,所以线程不会陷入阻塞,这是效率提升的原因之一,但是如果竞争激烈,重试必然会频繁发生,反而会影响效率。
当前主线程只能判断出共享变量的值与最初值是否相同,不能判断在重新获取变量之前,是否被其他线程修改过。如果只要有其他线程修改过共享变量,那么自己的CAS操作就算失败,这个时候需要加一个版本号:
AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
只要修改变量了,就要在版本号上+1。
原子累加器
JDK8以后新增了几个累加器,比如LongAdder,性能比直接使用AtomicInteger高很多,性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Thread-0累加Cell[0],而Thread-1累加Cell[1]。。。最后将结果汇总。这样他们在累加时操作的不同的Cell变量,因此减少了CAS重试失败,从而提高了性能。
LongAdder源码分析
LongAdder类有几个关键变量
//累加单元数组,懒惰初始化
transient volatile Cell[] cells;
//基础值,如果没有竞争,则用cas累加这个域
transient volatile long base;
//在cells创建或扩容时,置为1,表示加锁
transient volatile int cellsBusy;
transient表示序列化时不会把信息进行序列化。
如果由我们自己来实现加锁,可能会写成下列的代码:
public class LockCas {
private AtomicInteger state = new AtomicInteger(0);
public void lock() {
while (true) {
if(state.compareAndSet(0, 1)) {
break;
}
}
}
public void unlock() {
state.set(0);
}
public static void main(String[] args){
LockCas lock = new LockCas();
new Thread(()->{
lock.lock();
try {
sleep(1);
}finally {
lock.unlock();
}
}).start();
}
}
但是可以发现,如果lock会耗费大量的资源来进行空循环,所以生产上肯定无法使用。接下来我们看看LongAdder的使用方法:
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
。。。
以上是累加单元Cell的源码,重要的就是cas方法。使用cas的方式进行累加,prev表示旧值,next表示新值。
@sun.misc.Contended是为了防止缓存行伪共享
我们先来看看缓存与内存的关系:
因为CPU与内存的速度差异很大,需要靠预读取数据至缓存来提升效率。缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中,CPU要保证数据的一致性,如果某个CPU核心更改了数据,其他CPU核心对应的整个缓存行必须失效。
@sun.misc.Contended的原理是在使用此注解的对象或字段的前后各增加128字节大小的padding,从而让CPU将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效。
我们查看LongAdder的源码,看到主要的add方法如下:
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
我们看到longAccumulate的源码逻辑如下图:
第一种情况:
第二种情况:
第三种情况:
当累加单元累加完毕,需要把累加单元中的数汇总起来,源码中使用了sum方法来操作:
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
Unsafe
Unsafe对象提供了非常底层,操作内存、线程的方法,Unsafe对象不能直接调用,只能通过反射获得:
public class UnsafeAccessor {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe)theUnsafe.get(null);
}catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
数据库连接池
public class Pool {
private int poolSize;
//连接对象数组
private Connection[] connections;
//连接状态数组 0:空闲 1:繁忙
private AtomicIntegerArray states;
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for(int i=0; i<poolSize; i++) {
connections[i] = new MockConnection();
}
}
//借连接
public Connection borrow() {
while(true) {
for(int i=0; i<poolSize; i++) {
if(states.get(i) == 0) {
if(states.compareAndSet(i, 0, 1)){
return connections[i];
}
}
}
//如果没有空闲连接,让当前线程进入等待
synchronized (this) {
try {
this.wait();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//归还连接
public void free(Connection conn) {
for(int i=0; i<poolSize; i++) {
if(connections[i] == conn) {
states.set(i, 0);
synchronized (this) {
this.notifyAll();
}
break;
}
}
}
}