日常开发中经常会遇到需要同时启动多个线程去并行执行任务的情况,但如果使用循环启动多个线程,并不是真正意义上的同时启动,JDK提供了CountDownLatch和CyclicBarrier可以很好的处理这个问题。本文先讲讲CountDownLatch的实现方式和原理。首先来看一个案例
public class CountDownLatchTest {
public static void main(String[] args) {
int n = 5;
CountDownLatch countDownLatch = new CountDownLatch(n);
for (int i = 0; i < n; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
//等待所有线程就绪
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " 开始时间:" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.setName("线程-" + i);
thread.start();
System.out.println(thread.getName() + " 就绪时间:" + System.currentTimeMillis());
//countDownLatch内部计数器减1
countDownLatch.countDown();
}
}
}
运行结果如下: 可以看出,使用CountDownLatch实现了真正的同时启动线程
线程-0 就绪时间:1609207609803
线程-1 就绪时间:1609207609804
线程-2 就绪时间:1609207609804
线程-3 就绪时间:1609207609805
线程-4 就绪时间:1609207609805
线程-0 开始时间:1609207609805
线程-1 开始时间:1609207609805
线程-2 开始时间:1609207609805
线程-3 开始时间:1609207609805
线程-4 开始时间:1609207609805
原理分析
从上面的实例可以看出,初始化CountDownLatch时会初始化其内部的计数器,然后每次调用countDown()方法时计数器会减1,当计数器减为0时线程才会开始执行。下面我们通过源码来看看CountDownLatch是如何初始化内部计数器,在何时计数器会递减以及计数器变为0时做了什么操作。
从上面的类图可以看到,CountDownLatch内部维护了一个Sync内部类,而这个内部类是继承自AQS(AbstractQueuedSynchronizer)的。通过对CountDownLatch构造函数的分析会发现,其实CountDownLatch是通过把计数器的值赋值给AQS的state变量来进行管理的。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
主要方法分析
void await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//AQS获取共享资源时可被中断的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果线程被中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//调用Sync方法判断计数器值是否为0,为0则直接返回,否则进入AQS队列等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//Sync实现AQS的方法判断计数器值是否为0
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
由以上代码可知,await()方法获取共享资源时可以被中断,主要作用是判断计数器值是否为0,为0则直接返回,否则调用AQS的doAcquireSharedInterruptibly方法让线程阻塞。
boolean await(long timeout, TimeUnit unit)
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
当线程调用该方法后,当前线程会被阻塞,与await()方法不同的是,此方法有个超时时间,当设置的timeout时间到了,会因为超时返回false。
void countDown()
public void countDown() {
sync.releaseShared(1);
}
//AQS释放资源的方法
public final boolean releaseShared(int arg) {
//调用Sync实现的AQS方法
if (tryReleaseShared(arg)) {
//释放资源
doReleaseShared();
return true;
}
return false;
}
//Sync实现的AQS方法
protected boolean tryReleaseShared(int releases) {
//循环CAS给state进行递减并赋值
for (;;) {
int c = getState();
//state等于0时直接返回,避免出现负数的情况
if (c == 0)
return false;
//使用CAS让计数器减1
int nextc = c-1;
if (compareAndSetState(c, nextc))
//等于0则释放资源
return nextc == 0;
}
}
countDown()方法首先判断state的值是否为0,为0则直接返回,否则使用CAS将计数器值减1,如果减1后变为0则表示是最后一个线程调用countDown()方法,此时还需要调用doReleaseShared()方法唤醒因为await()而阻塞的线程。
总结
CountDownLatch是使用AQS实现的,其使用AQS的状态值来存储计数器的值,在初始化的时候设置计数器的值,调用await()方法将当前线程放入AQS的阻塞队列等待计数器的值变为0后返回。多个线程调用countDown()方法时会原子性地对计数器的值递减1,当计数器的值变为0时,当前线程会调用AQS的doReleaseShared()方法释放资源,从而激活被await()方法阻塞的线程。