您的位置 首页 java

自定义 Java 同步工具

来源:公号|一个八月想偷懒的开发坑 , 
作者 他的
 

前言

如果类库没有提供需求的功能,可以通过类库提供的底层机制来构造自己的同步机制,包括内置的条件队列、显式的 Condition 对象以及

AbstractQueuedSynchronizer 框架,实现状态依赖性的各种选择,以及在使用平台提供的状态依赖性机制如何遵守各项规则。

状态依赖性的管理

在编写顺序程序中的类时,要使得这些类在它们的前提条件未被满足时就失败。但在并发程序中,基于状态的条件可能会由于其他 线程 的操作而改变:一个资源池可能在几条指令之前还是空的,但现在却变为非空的,因为另一条线程可能会返回一个元素到资源池。对于并发对象上依赖的方法,虽然有时候在前提条件不满足的情况下不会失败,但通常有一种更好的选择,即等待前提条件变为真。为了突出高效的条件等待机制的价值,先通过轮询与休眠等方式来(勉强地)解决状态依赖性问题。

在生产者-消费者的设计中经常会使用像

ArrayB Lock ingQueue 这样的有界 缓存 。在有界缓存提供的 put 和 take 操作中都包含了一个前提条件:不能从空缓存中获取元素,也不能将元素放入已满的缓存中。当前提条件未满足时,依赖状态的操作可以抛出一个异常或返回一个错误状态(使其成为调用者的一个问题),也可以保持阻塞直到对象进入正确的状态。

有界缓存的几种实现,其中将采用不同的方法来处理前提条件失败的问题。

BaseBoundedBuffer 实现了一个基于数组的循环缓存;其中各个缓存状态变量(buf、head、tail 和 count)均由缓存的内置锁来保护。它还提供了同步的 doPut 和 doTake 方法,并在子类中通过这些方法来实现 put 和 take 操作,底层的状态将对子类隐藏。

自定义 Java 同步工具

自定义 Java 同步工具

示例:将前提条件的失败传递给调用者

GrumpyBoundedBuffer 是第一个简单的有界缓存实现。put 和 take 方法都进行了同步以确保实现对缓存状态的独占访问,因为这两个方法在访问缓存时都采用 “先检查再运行” 的逻辑策略。

自定义 Java 同步工具

“缓存已满” 并不是有界缓存的一个异常条件,就像 “红灯” 并不表示交通信号灯出现了异常。在实现缓存时得到的简化(使使用者管理状态依赖性)并不能抵消在使用时存在的复杂性,因为现在调用者必须做好捕获异常的准备,并在每次缓存操作时都需要重试。(将导致一些功能无法实现,例如维持 FIFO 顺序,由于迫使调用者重试,因此失去了谁先到达的信息)

自定义 Java 同步工具

当缓存处于某种错误的状态时返回一个错误值,这是一种改进,因为并没有放弃异常机制,抛出的异常意味着再试一次,但这种方法并没有解决根本问题:调用者必须自行处理前提条件失败的情况。(Queue 提供了上述两种选择,即 poll 方法能够在队列为空时返回 null,而 remove 方法则抛出一个异常,但 Queue 并不适合在生产者-消费者设计中使用。BlockingQueue 中的操作只有当队列处于正确状态时才会进行处理,否则将阻塞,因此当生产者和消费者并发执行时,BlockingQueue 才是更好的选择)

客户代码不是实现重试的唯一方式。调用者可以不进入休眠状态,而直接重新调用 take 方法,这种方法被称为忙等待或自旋等待。如果缓存的状态在很长一段时间内都不会发生变化,那么使用这种方法就会消耗大量的 CPU 时间。但是,调用者也可以进入休眠状态来避免消耗过多的 CPU 时间,但如果缓存的状态在刚调用完 sleep 就立即发生变化,那么将不必要地休眠一段时间。因此,客户代码必须要在二者之间进行选择:要么容忍自旋导致的 CPU 时钟周期浪费,要么容忍由于休眠而导致的低响应性。(除了忙等待与休眠之外,还有一种选择就是调用 Thread.yield,这相当于给调度器一个提示:现在需要让出一定的时间使另一个线程运行。假如正在等待另一个线程执行工作,那么如果选择让出处理器而不是消耗完整个 CPU 调度时间片,那么可以使整体的执行过程变快。)

示例:通过轮询与休眠来实现简单的阻塞

SleepyBoundedBuffer 尝试通过 put 和 take 方法来实现一种简单的 “轮询与休眠” 重试机制,从而使调用者无须在每次调用时都实现重试逻辑。如果缓存为空,那么 take 将休眠并直到另一个线程在缓存中放入一些数据;如果缓存是满的,那么 put 将休眠直到另一个线程从缓存中移除一些数据,以便有空间容纳新的数据。这种方法将前提条件的管理操作封装起来,并简化了对缓存的使用。

自定义 Java 同步工具

缓存代码必须在持有缓存锁时才能测试相应的状态条件,因为表示状态条件的变量是由缓存锁保护的。如果测试失败,那么当前执行的线程将首先释放并休眠一段时间,从而使其他线程能够访问缓存(通常,如果线程在休眠或者被阻塞时持有一个锁,那么通常是一种不好的做法,因为只要线程不释放这个锁,有些条件例如缓存为满 / 空就永远无法为真)。当线程醒来时,它将重新请求锁并在此尝试执行操作,因而线程将反复地在休眠以及测试状态条件等过程之间进行切换,直到可以执行操作为止。

要选择合适的休眠时间间隔,就需要在响应性与 CPU 使用率之间进行权衡。休眠间隔越小,响应性越高,但消耗的 CPU 资源也越高。在缓存中出现可用空间的时刻与线程醒来并再次检查的时刻之间可能存在延迟,例如在线程刚刚进入休眠后,条件立即为真,此时将存在不必要的休眠时间。

SleepyBoundedBuffer 对调用者提出了一个新的需求:处理InterruptedException。当一个方法由于等待某个条件为真而阻塞时,需要提供一种取消机制。与大多数具备良好行为的阻塞方法一样,SleepyBoundedBuffer 通过中断来支持取消,如果该方法被中断,将提前返回并抛出 InterruptedException。如果存在某种挂起线程的方法,并且这种方法能够确保当某个条件为真时线程立即醒来,将极大地简化实现工作,这正是条件队列实现的功能。

条件队列

条件队列就好像烤面包机中通知 “面包已烤好” 的铃声。如果你注意听铃声,那么当面包烤好后可以立刻得到通知。如果没有听见铃声(可能出去拿报纸),那么会错过通知信息,但回到厨房时还可以观察面包机的状态,如果已经烤好,就取出面包,如果未烤好,就再次留意铃声。

条件队列,是使得一组线程(等待线程集合)能够通过某种方式来等待特定的条件为真。传统队列的元素时一个个数据,而与之不同是条件队列中的元素是一个个正在等待相关条件的线程。

正如每个 Java 对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且 Object 中的 wait、notify 和 notifyAll 方法就构成了内部条件队列的 API。对象的内置锁与其内部条件队列时互相关联的,要调用条件 X 中条件队列的任何一个方法,必须持有对象 X 上的锁。这是因为 “等待由状态构成的条件” 与 “维护状态一致性” 这两种机制必须被紧密地绑定在一起:只有能对状态进行检查时,才能在某个条件上等待,并且只有能修改状态时,才能从条件等待中释放另一个线程。

Object.wait 会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。当被挂起的线程醒来时,它将在返回之前重新获取锁。从直观上来理解,调用 wait 意味着 “休息,当发生特定事件时唤醒我”,而调用通知方法就意味着 “特定事件发生了”。

BoundedBuffer 中使用了 wait 和 notifyAll 来实现一个有界缓存,比使用休眠的有界缓存更简单,并且更高效(当缓存状态没有发生变化时,线程醒来的次数将更少),响应性也更高(当发生特定状态变化时将立即醒来)。这是一个较大的改进,但要注意:与使用休眠的有界缓存相比,条件队列并没有改变原来的语义。只是在多个方面进行优化:CPU 效率、上下文切换开销和响应性等。如果某个功能无法通过 “轮询和休眠” 来实现,那么使用条件队列也无法实现(一个公平的条件队列可以确保线程按照顺序从等待集合中释放,与内置锁相同,内置条件队列并不提供公平的排队操作,而在显式的 Condition 却可以提供公平或非公平的排队操作),但条件队列使得在表达和管理状态依赖性时更加简单和高效。

自定义 Java 同步工具

BoundedBuffer 变得足够好了,不仅简单易用,而且实现了明晰的状态依赖性管理。在产品的正式版本中还应包括限时版本的 put 和 take,这样当阻塞操作不能再预计时间内完成时,可以因超时而返回。通过使用定时版本的 Object.wait,可以很容易实现这些方法。

使用条件队列

虽然许多规则都能确保正确地使用条件队列,但在编译器或系统平台上却并没有强制要求遵循这些规则。(这也是为什么要尽量基于 LinkedBlockingQueue、Latch、 Semaphore 和 FutureTask 等类来构造程序的原因之一,如果能避免使用条件队列,那么实现起来将容易许多)

条件谓词

要想正确地使用条件队列,关键是找出对象在哪个条件谓词上等待。如果没有条件谓词,条件等待机制将无法发挥作用。条件谓词是使某个操作成为状态依赖操作的前提条件。对 take 方法来说,它的条件谓词就是 “缓存不为空”,take 方法在执行之前必须首先测试该条件谓词。

在条件等待中存在一种重要的三元关系,包括加锁、wait 方法和一个条件谓词。在条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象与条件队列对象(即调用 wait 和 notify 等方法所在的对象)必须是同一个对象。

每一次 wait 调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的 wait 时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件谓词的状态变量。

过早唤醒

内置条件队列可以与多个条件谓词一起使用。当一个线程由于调用 notifyAll 而醒来时,并不意味该线程正在等待的条件谓词已经变成真了。(这就像烤面包机和咖啡机共用一个铃声,当响铃后,你必须查看是哪个设备发出的铃声。)另外,wait 方法还可以 “假装” 返回,而不是由于某个线程调用了 notify。

当执行控制重新进入调用 wait 的代码时,它已经重新获取了与条件队列相关联的锁。现在条件谓词是不是已经变为真了?或许。在发出通知的线程调用 notifyAll 时,条件谓词可能已经变成真,但在重新获取锁时将再次变为假。在线程被唤醒到 wait 重新获取锁的这段时间里,可能有其他线程已经获取了这个锁,并修改了对象的状态。或者,条件谓词从调用 wait 起根本就没有变成真。你并不知道另一个线程为什么调用 notify 或 notifyAll,也许是因为与同一条件队列相关的另一个条件谓词变成了真。”一个条件队列与多个条件谓词相关” 是一种常见的情况——在 BoundedBuffer 中使用的条件队列与 “非满” 和 “非空” 两个条件谓词相关(线程可能同时在 “非满” 与 “非空” 这两个条件谓词上等待。当生产者 / 消费者的数量超过缓存的容量时,就会出现这种情况)。

基于所有这些原因,每当线程从 wait 中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真,那么就继续等待(或者失败)。由于线程在条件谓词不为真的情况下也可以反复地醒来,因此必须在一个循环中调用 wait,并在每次迭代中都测试条件谓词。

自定义 Java 同步工具

丢失的信号

丢失的信号是指:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。现在,线程将等待一个已经发生过的事件。这就好比在启动了烤面包机出去拿报纸,当你还在屋外时烤面包机的铃声响了,但你没有听到,因此还会坐在厨房的桌子前等着烤面包机的铃声。你可能会等待很长的时间(为了摆脱等待,其他人也不得不开始烤面包,从而使得情况变得糟糕,当铃声响起时,还要与别人争论这个面包是属于谁的。)如果线程 A 通知了一个条件队列,而线程 B 随后在这个条件队列上等待,那么线程 B 将不会立即醒来,而是需要另一个通知来唤醒它。编码错误(例如没有在调用 wait 之前检测条件谓词)会导致信号的丢失。

通知

在有界缓存中,如果缓存为空,在调用 take 时将阻塞。在缓存变为非空时,为了使 take 解除阻塞,必须确保在每条使缓存变为非空的代码路径中都发出一个通知。

在条件队列 API 中有两个发出通知的方法,即 notify 和 notifyAll。无论调用哪个,都必须持有与条件队列对象相关联的锁。在调用 notify 时,JVM 会从这个条件队列上等待的多个线程中选择一个来唤醒,而调用 notifyAll 则会唤醒所有在这个条件队列上等待的线程。由于在调用 notify 或 notifyAll 时必须持有条件队列对象的锁,而如果这些等待中线程此时不能重新获得锁,那么无法从 wait 返回,因此发出通知的线程应该尽快地释放锁,从而确保正在等待的线程尽可能快地解除阻塞。

由于多个线程可以基于不同的条件谓词在同一个条件队列上等待,因此如果使用 notify 而不是 notifyAll,那么将是一种危险的操作,因为单一的通知很容易导致类似于信号丢失的问题。

在 BoundedBuffer 中的条件队列用于两个不同的条件谓词:”非空” 和 “非满”。假设线程 A 在条件队列上等待条件谓词 PA,同时线程 B 在同一个条件队列上等待条件谓词 PB。现在,假设 PB 变成真,并且线程 C 执行一个 notify:JVM 将从它拥有的众多线程中选择一个并唤醒。如果选择了线程 A,那么它被唤醒,并且看到 PA 尚未变成真,因此将继续等待。同时,线程 B 本可以开始执行,却没有被唤醒。这并不是严格意义上的 “丢失信号”,更像一种 “被劫持的” 信号,但导致的问题时相同的:线程正在等待一个已经(或本应该)发生过的信号。

只有同时满足以下两个条件时,才能用单一的 notify 而不是 notifyAll:

  • 所有等待线程的类型都相同。只有一个条件谓词与条件队列相关,并且每个线程在从 wait 返回后将执行相同的操作。
  • 单进单出。在条件变量上的每次通知,最多只能唤醒一个线程来执行。

由于大多数类并不满足这些需求,因此普遍认可的做法是优先使用 notifyAll 而不是 notify。虽然 notifyAll 可能比 notify 更低效,但却更容易确保类的行为是正确的。这种低效情况带来的影响有时候很小,但有时候却非常大。如果有 10 个线程在一个条件队列上等待,那么调用 notifyAll 将唤醒每个线程,并使得它们在锁上发生竞争。然后,它们中的大多数或者全部又都回到休眠状态。因而,在每个线程执行一个事件的同时,将出现大量的上下文切换操作以及发生竞争的锁获取操作。(最坏的情况是,在使用 notifyAll 时将导致 O(n^2) 次唤醒操作,而实际上只需要 n 次唤醒操作就足够了)这是 “性能考虑因素与安全性考虑因素相互矛盾” 的另一种情况。

在 BoundedBuffer 的 put 和 take 方法中采用的通知机制是保守的,可以对其进行优化:首先,仅当缓存从空变为非空,或者从满转为非满时,才需要释放一个线程。并且,仅当 put 或 take 影响到这些状态转换时,才发出通知。这也被称为 “条件通知”。虽然可以提升性能,但却很难正确地实现(而且还会使子类的实现变得复杂),因此在使用时需谨慎。

自定义 Java 同步工具

单次通知和条件通知都属于优化措施。如果不正确地使用这些优化措施,很容易在程序中引入奇怪的活跃性故障。

示例:阀门类

虽然闭锁机制通常都能满足需求,但在某些情况下存在一个缺陷:按照这种方式构造的阀门在打开后无法重新关闭。通过使用条件等待,可以很容易地开发一个可重新关闭的 ThreadGate 类,可以打开和关闭阀门,并提供一个 await 方法,能一直阻塞到阀门被打开。

自定义 Java 同步工具

这种条件谓词时必需的,因为如果当阀门打开时有 N 个线程正在等待它,那么这些线程都应该被允许执行。然而,如果阀门在打开后又非常快速地关闭了,并且 await 方法只检查 isOpen,那么所有线程都可能无法释放:当所有线程收到通知时,将重新请求锁并退出 wait,而此时的阀门可能已经再次关闭了。因此,在 ThreadGate 中使用了一个更复杂的条件谓词:每次阀门关闭时,递增一个 Generation 计数器,如果阀门现在是打开的,或者阀门自从该线程到达后就一直是打开的,那么线程就可以通过 await。

由于 ThreadGate 只支持等待打开阀门,因此它只在 open 中执行通知。要想既支持等待打开又支持等待关闭,那么必须在 open 和 close 中都进行通知。这很好地说明了为什么在维护状态依赖的类时是非常困难的——当增加一个新的状态依赖操作时,可能需要对多条修改对象的代码路径进行改动,才能正确地执行通知。

子类的安全问题

要想支持子类化,在设计类时需要保证:如果在实施子类化时违背了条件通知或单次通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类。

对于状态依赖的类,要么将其等待和通知等协议完全向子类公开(并且写入正式文档),要么完全阻止子类参与到等待和通知等过程中。当设计一个可被继承的状态依赖类时,至少需要公开条件队列和锁,并且将条件谓词和同步策略都写入文档。此外,还可能需要公开一些底层的状态变量。

另外一种选择是完全禁止子类化,例如将类声明为 final 类型,或者将条件队列、锁和状态变量等隐藏起来,使子类看不见它们。否则,如果子类破坏了在基类中使用 notify 的方式,那么基类需要修复这种破坏。考虑一个无界的可阻塞栈,当栈为空时,pop 操作将阻塞,但 push 操作通常可以执行。这就满足了使用单次通知的需求。如果在这个类中使用了单次通知,并且在其一个子类中添加了一个阻塞的 “弹出两个连续元素” 方法,那么就会出现两种类型的等待线程:等待弹出一个元素的线程和等待弹出两个元素的线程。但如果基类将条件队列公开出来,并且将使用该条件队列的协议也写入文档,那么子类就可以将 push 方法改写为执行 notifyAll,从而重新确保安全性。

封装条件队列

把条件队列封装起来,因而除了使用条件队列的类,就不能在其他地方访问它。否则,调用者会自以为理解了在等待和通知上使用的协议,并且采用一种违背设计的方式来使用条件队列。

不幸的是将条件队列封装起来,与线程安全类的最常见设计模式并不一致,这种模式建议使用对象的内置锁来保护对象自身的状态。在 BoundedBuffer 中缓存对象自身既是锁,又是条件队列。对其重新设计为使用私有的锁对象和条件队列,唯一不同是重新设计后其将不再支持任何形式的客户端加锁。

显式的 Condition对象

在某些情况下,当内置锁过于不灵活时,可以使用显式锁。正如 Lock 是一种广义的内置锁,Condition 也是一种广义的内置条件队列。

自定义 Java 同步工具

内置条件队列存在一些缺陷,每个内置锁都只能有一个相关联的条件队列,因而在像 BoundedBuffer 这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。这些因素都使得无法满足在使用 notifyAll 时所有等待线程为同一类型的需求。如果想编写一个带有多个条件谓词的并发对象,或者想获得除了条件队列可见性之外的更多控制权,就可以使用显式的 Lock 和 Condition 而不是内置锁和条件队列,这是一种更灵活的选择。

一个 Condition 和一个 Lock 关联在一起,就像一个条件队列和一个内置锁相关联一样。要创建一个 Condition,可以在相关联的 Lock 上调用 Lock.newCondition 方法。Condition 同样比内置条件队列提供更丰富的功能:在每个锁上可存在多个等待、条件等待可以使可中断的或不可中断的、基于时限的等待,以及公平的或非公平的队列操作。

与内置条件队列不同的是,对于每个 Lock,可以有任意数量的 Condition 对象。Condition 对象继承了相关的 Lock 对象的公平性,对于公平的锁,线程会依照 FIFO 顺序从 Condition.await 中释放。在 Condition 对象中,与 wait、notify 和 notifyAll 方法对应的分别是 await、 signal 和 signalAll。但是,Condition 对 Object 进行了扩展,因而它也包含 wait 和 notify 方法。一定要确保使用正确的版本—— await 和 signal。

有界缓存的另一种实现,即使用两个 Condition,分别为 notFull 和 notEmpty,用于表示 “非满” 与 “非空” 两个条件谓词。当缓存为空时,take 将阻塞并等待 notEmpty,此时 put 向 notEmpty 发送信号,可以解除任何在 take 中阻塞的线程。

自定义 Java 同步工具

自定义 Java 同步工具

它对条件队列的使用方式更容易理解——在分析使用多个 Condition 的类时,比分析一个使用单一内部队列加多个条件谓词的类简单得多。通过将两个条件谓词分开并放到两个等待线程集中,Condition 使其更容易满足单次通知的需求。signal 比 signalAll 更高效,它能极大地减少在每次缓存操作中发生的上下文切换与锁请求次数。

与内置锁和条件队列一样,当使用显式的 Lock 和 Condition 时,也必须满足锁、条件谓词和条件变量之间的三元关系。在条件谓词中包含的变量必须由 Lock 来保护,并且在检查条件谓词以及调用 await 和 signal 时,必须持有 Lock 对象。

如果需要一些高级功能,例如使用公平的队列操作或者在每个锁上对应多个等待线程集,那么应该优先使用 Condition 而不是内置条件队列。(如果需要 ReentrantLock 的高级功能,并且已经使用了它,那么就应该选择 Condition)。

Synchronizer 剖析

在 ReentrantLock 和 Semaphore 这两个接口都可以用做一个 “阀门” 类,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用 lock 或 acquire 时成功返回),也可以等待(在调用 lock 或 acquire 时阻塞),还可以取消(在调用 tryLock 或 tryAcquire 时返回 false,表示在指定时间内锁是不可用的或者无法获得许可)。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。

或许你会认为 Semaphore 是基于 ReentrantLock 实现的,或者认为 ReentrantLock 实际上是带有一个许可的 Semaphore,这些实现方式都是可行的,可以通过锁来实现计数信号量,以及可以通过计数信号量来实现锁。

自定义 Java 同步工具

自定义 Java 同步工具

在实现时使用了一个共同的基类,即

AbstractQueuedSynchronizer(AQS),这个类也是其他许多同步类的基类。AQS 时一个用于构建锁和同步器的框架,许多同步器都可以通过 AQS 很容易并且高效地构造出来。不仅 ReentrantLock 和 Semaphore 时基于 AQS 构建的,还包括 CountDownLatch、ReentrantReadWriteLock、SynchronousQueue 和 FutureTask。

AQS 解决了在实现同步器时涉及的大量细节问题,例如等待线程采用 FIFO 队列操作顺序。在不同的同步器中还可以定义一些灵活的标准来判断某个线程时应该通过还是需要等待。

基于 AQS 来构建同步器能带来许多好处。它不仅能极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题(在没有使用 AQS 来构建同步器时的情况)。在 SemaphoreOnLock 中,获取许可的操作可能在两个时刻阻塞——当锁保护信号量状态时,以及当许可不可用时。

AbstractQueuedSynchronizer

如果能了解标准同步器类的实现方式,那么对于理解它们的工作原理是非常有帮助的。在基于 AQS 构建的同步器类中,最基本的操作包括各种形式的获取和释放操作,获取操作时一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,获取操作的含义就很直观,即获取的是锁或许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。在使用 CountDownLatch 时,获取操作意味着 “等待并直到闭锁到达结束状态”,而在使用 FutureTask 时,则意味着 “等待并直到任务已经完成”。释放并不是一个可阻塞的操作,当执行释放操作时,所有在请求时被阻塞线程都会开始执行。

如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS 负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过 getState,setState 以及 compareAndSetState 等 protected 类型方法来进行操作。这个整数可以用于表示任意状态。例如,ReentrantLock 用它来表示所有线程已经重复获取该锁的次数,Semaphore 用它来表示剩余的许可数量,FutureTask 用它来表示任务的状态(尚未开始、正在运行、已完成以及已取消)。在同步器类中还可以自行管理一些额外的状态变量,例如 ReentrantLock 保存了锁的当前所有者信息,以区分某个获取操作时重入还是竞争的。

根据同步器的不同,获取操作可以是一种独占操作(ReentrantLock),也可以是一种非独占操作(Semaphore 或 CountDownLatch)。一个获取操作包括两部分,首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败。这种判断是由同步器的语义决定的。例如,对于锁来说,如果它没有被某个线程持有,那么就能被成功地获取,而对于闭锁来说,如果它处于结束状态,那么也能被成功地获取。

其次,就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否也获取该同步器造成影响。例如,当获取一个锁后,锁的状态将从 “未被持有” 变成 “已被持有”,而从 Semaphore 中获取一个许可后,将把剩余许可的数量减 1。然而,当一个线程获取闭锁时,并不会影响其他线程能否获取它,因此获取闭锁的操作不会改变闭锁的状态。

如果某个同步器支持独占的获取操作,那么就需要实现一些保护方法,包括 tryAcquire、tryRelease 和 isHeldExclusively 等,而对于支持共享获取的同步器,则应该实现 tryAcquireShared 和 tryReleaseShared 等方法。AQS 中的 acquire、acquireShared、release 和 releaseShared 等方法都将调用这些方法在子类中带有前缀 try 的版本来判断某个操作是否能执行。在同步器的子类中,可以根据其获取操作和释放操作的语义,使用 getState、setState 以及 compareAndSetState 来检查和更新状态,并通过返回的状态值来告知基类 “获取” 或 “释放” 同步器的操作是否成功。例如,如果 tryAcquireShared 返回一个负值,那么表示获取操作失败,返回零值表示同步器通过独占方式被获取,返回正值则表示同步器通过非独占方式被获取。对于 tryRelease 和 tryReleaseShared 方法来说,如果释放操作使得所有在获取同步器时被阻塞的线程恢复执行,那么这两个方法应该返回 true。

一个简单的闭锁

OneShotLatch 是一个使用 AQS 实现的二元闭锁,包含两个公有方法:await 和 signal,分别对应获取操作和释放操作。起初,闭锁是关闭的,任何调用 await 的线程都将阻塞并直到闭锁被打开。当通过调用 signal 打开闭锁时,所有等待中的线程都将被释放,并且随后到达闭锁的线程也被允许执行。

自定义 Java 同步工具

在 OneShotLatch 中,AQS 状态用来表示闭锁状态——关闭(0)或者打开(1)。await 方法调用 AQS 的 acquireSharedInterruptibly,然后接着调用 OneShotLatch 中的 tryAcquireShared 方法。在 tryAcquireShared 的实现中必须返回一个值来表示该获取操作能够执行。如果之前已经打开了闭锁,那么 tryAcquireShared 将返回成功并允许线程通过,否则就会返回一个表示获取操作失败的值。acquireSharedInterruptibly 方法在处理失败的方式,是把这个线程放入等待线程队列中。signal 将调用 releaseShared,接下来又会调用 tryReleaseShared。在 tryReleaseShared 中将无条件地把闭锁的状态设置为打开,(通过返回值)表示该同步器处于完全被释放的状态。因而 AQS 让所有等待中的线程都尝试重新请求该同步器,并且由于 tryAcquireShared 将返回成功,因而现在的请求操作将成功。

OneShotLatch 可以通过拓展 AQS 来实现,而不是将一些功能委托给 AQS,但这种做法并不合理,这样做将破坏该接口(只有两个方法)的简洁性,并且虽然 AQS 的公共方法不允许调用者破坏闭锁的状态,但调用者仍可以容易地误用它们。java.util.concurrent 中的所有同步器都没有直接拓展 AQS,而都是将它们的相应功能委托给私有的 AQS 子类来实现。

AbstractQueuedSynchronizer

java.util.concurrent 中的许多可阻塞类,例如 ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch、SynchronousQueue 和 FutureTask 等,都是基于 AQS 构建的。

ReentrantLock

支持独占方式的获取操作,因此实现了

tryAcquire、tryRelease 和 IsHeldExclusively。

自定义 Java 同步工具

ReentrantLock 将同步状态由于保存锁获取操作的次数,并且还维护了一个 owner 变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量(由于受保护的状态操作方法具有 volatile 类型的内存读写语义,同时 ReentrantLock 只是在调用 getState 之后才会读取 owner 域,并且只有在调用 setState 之前才会写入 owner,因此 ReentrantLock 可以拥有同步状态的内存语义,因此避免了进一步的同步)。在 tryRelease 中检查 owner 域,从而确保当前线程在执行 unlock 操作之前已经获取了锁:在 tryAcquire 中将使用这个域来区分获取操作是重入的还是竞争的。

当一个线程尝试获取锁时,tryAcquire 将首先检查锁的状态。如果锁未被持有,它将尝试更新锁的状态以表示锁已经被持有。由于状态可能在检查后被立即修改,因此 tryAcquire 使用 compareAndSetState 来原子地更新状态,表示这个锁已经被占有,并确保状态在最后一次检查以后就没有被修改过。如果锁状态表明它已经被持有,并且如果当前线程是锁的拥有者,那么获取计数会递增,如果当前线程不是锁的拥有者,那么获取操作将失败。

ReentrantLock 还利用 AQS 对多个条件变量和多个等待线程集的内置支持。Lock.newCondition 将返回一个新的 ConditionObject 实例,这是 AQS 的一个内部类。

Semaphore 与 CountDownLatch

Semaphore 将 AQS 的同步状态用于保存当前可用许可的数量。tryAcquireShared 方法首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示获取操作失败。如果还有剩余的许可,那么 tryAcquireShared 会通过 compareAndSetState 以原子方式来降低许可的计数。如果这个操作成功(这意味着许可的计数从上一次读取后就没有被修改过),那么将返回一个值表示获取操作成功。在返回值中还包含了表示其他共享获取操作能否成功的信息,如果成功,那么其他等待的线程同样会解除阻塞。

自定义 Java 同步工具

当没有足够的许可,或者当 tryAcquireShared 可以通过原子方式来更新许可的计数以响应获取操作时,while 循环将终止。虽然对 compareAndSetState 的调用可能由于与另一个线程发生竞争而失败,并使其重新尝试,但在经过了一定次数的重试操作以后,在这两个结束条件中有一个会变为真。同样,tryReleaseShared 将增加许可计数,这可能会解除等待中线程的阻塞状态,并且不断地重试直到更新操作成功。tryReleaseShared 的返回值表示在这次释放操作中解除了其他线程的阻塞。

CountDownLatch 使用 AQS 的方式与 Semaphore 相似:在同步状态中保存的是当前的计数值,countDown 方法调用 release,从而导致计数值递减,并且当计数值为零时,解除所有等待线程的阻塞。await 调用 acquire,当计数器为零时,acquire 将立即返回,否则将阻塞。

FutureTask

FutureTask 不像一个同步器,但 Future.get 的语义非常类似于闭锁的语义——如果发生了某个事件(由 FutureTask 表示的任务执行完成或被取消),那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件完成。

在 FutureTask 中,AQS 同步状态被用来保存任务的状态,例如,正在运行、已完成或已取消。FutureTask 还维护一些额外的状态变量,用来保存计算结果或抛出的异常。此外,还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运行状态),因而如果任务取消,该线程就会中断。

ReentrantReadWriteLock

ReadWriteLock 接口表示存在两个锁:一个读取锁和一个写入锁,但在基于 AQS 实现的 ReentrantReadWriteLock 中,单个 AQS 子类将同时管理读取加锁和写入加锁。ReentrantReadWriteLock 使用一个 16 位的状态来表示写入锁的计数,并且使用另一个 16 位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。

AQS 在内部维护一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在 ReentrantReadWriteLock 中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。(这种机制并不允许选择读取线程优先或写入线程优先等策略,在某些读写锁实现中也采用这种方式。因此,要么 AQS 的等待队列不能是一个 FIFO 队列,要么使用两个队列。然而,在实际中很少需要这么严格的排序策略。如果非公平版本中 ReentrantReadWriteLock 无法提供足够的活跃性,那么公平版本的 ReentrantReadWriteLock 通常会提供令人满意的排序保证,并且能确保读取线程和写入线程不会发生饥饿问题。)

文章来源:智云一二三科技

文章标题:自定义 Java 同步工具

文章地址:https://www.zhihuclub.com/201192.shtml

关于作者: 智云科技

热门文章

网站地图