CountDownLatch & CyclicBarrier.md
JUC 系列之 CountDownLatch 与 CycleBarrier
CountDownLatch
该类比较简单,因为分析过 AQS,这里直接看 Sync 类:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
this.setState(count);
}
int getCount() {
return this.getState();
}
protected int tryAcquireShared(int acquires) {
return this.getState() == 0 ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
int c;
int nextc;
do {
c = this.getState();
if (c == 0) {
return false;
}
nextc = c - 1;
} while(!this.compareAndSetState(c, nextc));
return nextc == 0;
}
}
这里 尝试加锁直接判断当前计数器是否为 0,如果不为 0,则所有线程都会加锁失败。
而释放锁为使用 CAS 自旋操作不断修改 state,直到成功。
来看 CountDownLatch 比较常用的方法:
public void await() throws InterruptedException {
this.sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return this.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
都是直接调用 sync 的相关方法,即 AQS 的相关方法
public void countDown() {
this.sync.releaseShared(1);
}
CyclicBarrier
CyclicBarrier 相当于一个线程运行屏障,当线程来到这会进入屏障阻塞,等到等待的线程到达一定数量了才开放这几个线程通过,然后继续阻塞。
可以看到 CycleBarrier 实际上并没有直接调用 AQS 的相关方法,而是使用了 可重入锁 ReentrantLock 与条件队列 Condition 。
此外,还维护了一个 Generation 表示屏障实体,其中 broken 表示该屏障是否打开。
先来看看构造方法:
public CyclicBarrier(int parties, Runnable barrierAction) {
this.lock = new ReentrantLock();
this.trip = this.lock.newCondition();
this.generation = new CyclicBarrier.Generation();
if (parties <= 0) {
throw new IllegalArgumentException();
} else {
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
}
public CyclicBarrier(int parties) {
this(parties, (Runnable)null);
}
parties 表示闸门的临界值, barrierAction 表示闸门开启之前要调用的方法,也就是一个监听的作用,这里没啥好说的。
来看看几个无关痛痒的方法:
private void nextGeneration() {
this.trip.signalAll();
this.count = this.parties;
this.generation = new CyclicBarrier.Generation();
}
private void breakBarrier() {
this.generation.broken = true;
this.count = this.parties;
this.trip.signalAll();
}
nextGeneration
为唤醒当前所有等待线程,将计数器复原,同时创建新的 Generation 对象,可以理解为重新开始。
breakBarrier
为开闸的方法,首先将 generation.broken
设置为 true,然后复原计数器,在唤醒所有等待线程 。
接下来看看等待方法:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return this.dowait(false, 0L);
} catch (TimeoutException var2) {
throw new Error(var2);
}
}
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
return this.dowait(true, unit.toNanos(timeout));
}
可以看到最终都是来到 dowait 中:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
ReentrantLock lock = this.lock;
lock.lock();
byte var9;
try {
CyclicBarrier.Generation g = this.generation;
if (g.broken) {
throw new BrokenBarrierException();
}
if (Thread.interrupted()) {
this.breakBarrier();
throw new InterruptedException();
}
int index = --this.count;
if (index != 0) {
do {
try {
if (!timed) {
this.trip.await();
} else if (nanos > 0L) {
nanos = this.trip.awaitNanos(nanos);
}
} catch (InterruptedException var19) {
if (g == this.generation && !g.broken) {
this.breakBarrier();
throw var19;
}
Thread.currentThread().interrupt();
}
if (g.broken) {
throw new BrokenBarrierException();
}
if (g != this.generation) {
int var21 = index;
return var21;
}
} while(!timed || nanos > 0L);
this.breakBarrier();
throw new TimeoutException();
}
boolean ranAction = false;
try {
Runnable command = this.barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
this.nextGeneration();
var9 = 0;
} finally {
if (!ranAction) {
this.breakBarrier();
}
}
} finally {
lock.unlock();
}
return var9;
}
可以看到首先判断计数器,如果还需要等待则是一个监听时间的死循环,然后核心在 this.trip.await()
,调用了 condition
的条件,等待唤醒,当然在此之前需要使用 Reentrant 加锁 。
注意到循环中有个地方可以跳出循环:
if (g != this.generation) {
int var21 = index;
return var21;
}
当被唤醒后发现 generation 改变,则直接返回。
如果已经不需要等待了(计数器为 0),则会跳过循环继续往下执行,可以看到最终会调用监听 的 runnable 的方法,然后调用 this.nextGeneration();
方法换新的闸门,同时,如果 runnable 中抛出异常,则会直接来到 finally,执行 reakBarrier() 方法。
考虑一种情况,条件队列中有若干个条件进行阻塞,而外部手动调用 reset 方法:
public void reset() {
ReentrantLock lock = this.lock;
lock.lock();
try {
this.breakBarrier();
this.nextGeneration();
} finally {
lock.unlock();
}
}
可以看到这里开闸并切换闸,这势必会导致 Condition 中的线程被唤醒开始抢锁,而此时锁一定被调用 reset 的线程占用,因此等到 reset 执行完毕,来到 unlock 的时候,线程中就会有一个线程获得锁,并从 this.trip.await()
继续执行,因此时 Generation 已经改变,因此会直接返回 (依然会执行 finally 中的 unlock 方法)。