您的位置 首页 java

Java并发工具AbstractQueuedSynchronizer实现详解

作者:吴潇 职位:java软件工程师

原创声明

这是本人署名原创文章,未经许可不允许转载。另外,这也是本人劳动成果,请勿抄袭。本公众号的所有文章均为本人原创,如果您比较感兴趣欢迎关注!
 

AbstractQueuedSynchronizer(以下简称AQS或AQS锁)是ReentrantLock的底层实现,它提供了自旋、FIFO线程等待队列和阻塞等功能。Java常见并发同步工具如Semaphore、CountDownLatch、ReentrantLock等都是基于AQS实现的。

AQS的实现要点总结如下:

1. 用一个原子int变量代表同步状态

AQS内部有一个原子int变量(命名为state),它是AQS的核心状态,也是唯一跟同步有关的变量。例如,ReentrantLock中state≠0表示锁已被占,state=0表示锁空闲。AQS的子类负责赋予state具体含义,通过覆写tryAcquire(), tryRelease(), tryAcquireShared(), tryReleaseShared(), isHeldExclusively()来通过以乐观锁方式对state进行操作。并且,state只允许通过getState(), setState(), compareAndSetState()对其操作以保证可见性和原子性。

2. 用一个CLH队列存放等待线程,每个线程一个结点,分独占模式和共享模式。

CLH队列(其实是CLH队列的变种)是用于实现自旋锁的队列数据结构,主体是一个链表,如下图所示。

CLH队列

在CLH队列中,每个线程的等待状态(waitState变量)保存在前一个结点中,取值可以是 1 (CANCELED,线程已经放弃等待),-1(SIGNAL,线程已经被阻塞、需要被唤醒),-2(CONDITION,线程在条件队列中等待,这种状态的结点不可能出现在CLH队列,只会出现在Condition条件队列上),-3(PROPAGATE,下一个在共享模式中等待的线程无条件唤醒,即唤醒动作可以继续往队列后面接力),0(就绪,这个状态代表线程没有被阻塞,并且准备好请求锁)。

3. 通过循环+CAS操作来对CLH队列进行修改

因为AQS本身就是用于实现锁的,所以AQS中的CLH队列没有锁来保护,且必须支持并发修改。怎么办?通过循环结合CAS操作来实现CLH队列操作的线程安全性。例如enq(),

private Node enq( final Node node) { for (;;) { Node t = tail ; if (t == null ) { // Must initialize if (compareAndSetHead( new Node())) tail = head ; } else { node. prev = t; if (compareAndSetTail(t, node)) { t. next = node; return t; } } }}

4. 加锁操作如何实现

与CLH队列操作很类似,加锁操作也是通过循环+CAS操作来实现,不过还使用到了让线程阻塞的方法LockSupport.park()。我们以不可中断的加锁操作为例,讲解其主要实现逻辑如下:在一个死循环中,首先调用乐观锁加锁操作 tryAcquire(),如果成功,则加锁操作直接返回;如果失败了,则判断当前线程结点的前一结点的waitState:如果等于SIGNAL,则直接进入阻塞(说明此时已经有前一个结点得到锁了),如果是CANCELED,则清理一次CLH队列(把已经取消等待的线程中队列中移除)再次执行循环,如果是0或PROPAGATION,则把状态改为SIGNAL,继续执行循环。如果前面的阻塞操作被其他线程唤醒了,再次执行循环。参考代码:

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
 boolean interrupted = false;
 for (;;) {
 final Node p = node.predecessor();
 if (p == head && tryAcquire(arg)) {
 setHead(node);
 p.next = null; // help GC
 failed = false;
 return interrupted;
 }
 if (shouldParkAfterFailedAcquire(p, node) &&
 parkAndCheckInterrupt())
 interrupted = true;
 }
 } finally {
 if (failed)
 cancelAcquire(node);
 }
}
 

加锁操作其实有计时版本、可中断版本,但大体逻辑就是上面这样,只不过在循环中再进行了一些时间判断、中断标志判断等,并体现在返回结果或抛出异常上。

总结一下,AQS的加锁操作就是在一个循环中,不断执行CAS加锁(成功则返回,失败则继续),然后不断阻塞和被唤醒(在每次被唤醒的时候,顺便执行一些CLH队列清理工作),再次执行CAS加锁。

5. 加锁操作优化

AQS中的加锁操作,即acquire(),进行了一种barge优化。意思就是,当锁被释放出来以后,此时正好有一个线程请求锁,而队列中的第一个线程也被唤醒并且请求锁,这两个线程谁将获得锁是不确定的。这种优化对AQS锁的并发性有提升,但是使得它变成不公平的锁(没有按照FIFO原则操作)。

public final void acquire(int arg) {
 if (!tryAcquire(arg) &&
 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 selfInterrupt();
}
 

6. 解锁操作如何实现

解锁操作的实现比较简单,先调用一个在子类中实现的CAS解锁操作(tryRelease),如果成功,则把队列中的第一个结点移出队列,判断是否需要唤醒下一个线程,需要则唤醒。

public final boolean release(int arg) {
 if (tryRelease(arg)) {
 Node h = head;
 if (h != null && h.waitStatus != 0)
 unparkSuccessor(h);
 return true;
 }
 return false;
}
 

总结

以上基本把AQS的主要实现逻辑总结完了。AQS的主体逻辑是用循环+CAS操作CLH队列,保证并发访问CLH队列的线程安全性。AQS加锁(支持阻塞)和AQS解锁操作依赖在子类中实现的不支持阻塞的CAS加锁操作(tryAcquire)和CAS解锁操作(tryRelease)实现。加锁操作也是在一个循环中,不断调用tryAcquire实现CAS加锁操作、阻塞唤醒,直到tryAcquire成功或者线程主动取消等待。

文章来源:智云一二三科技

文章标题:Java并发工具AbstractQueuedSynchronizer实现详解

文章地址:https://www.zhihuclub.com/183296.shtml

关于作者: 智云科技

热门文章

网站地图