CountDownLatch & CyclicBarrier.md

765

JUC 系列之 CountDownLatch 与 CycleBarrier

CountDownLatch

image20210728104926715.png

该类比较简单,因为分析过 AQS,这里直接看 Sync 类:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            this.setState(count);
        }

        int getCount() {
            return this.getState();
        }

        protected int tryAcquireShared(int acquires) {
            return this.getState() == 0 ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            int c;
            int nextc;
            do {
                c = this.getState();
                if (c == 0) {
                    return false;
                }

                nextc = c - 1;
            } while(!this.compareAndSetState(c, nextc));

            return nextc == 0;
        }
    }

这里 尝试加锁直接判断当前计数器是否为 0,如果不为 0,则所有线程都会加锁失败。

而释放锁为使用 CAS 自旋操作不断修改 state,直到成功。

来看 CountDownLatch 比较常用的方法:

    public void await() throws InterruptedException {
        this.sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return this.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

都是直接调用 sync 的相关方法,即 AQS 的相关方法

public void countDown() {
    this.sync.releaseShared(1);
}

CyclicBarrier

CyclicBarrier 相当于一个线程运行屏障,当线程来到这会进入屏障阻塞,等到等待的线程到达一定数量了才开放这几个线程通过,然后继续阻塞。

image20210728105928853.png

可以看到 CycleBarrier 实际上并没有直接调用 AQS 的相关方法,而是使用了 可重入锁 ReentrantLock 与条件队列 Condition 。

此外,还维护了一个 Generation 表示屏障实体,其中 broken 表示该屏障是否打开。

先来看看构造方法:

    public CyclicBarrier(int parties, Runnable barrierAction) {
        this.lock = new ReentrantLock();
        this.trip = this.lock.newCondition();
        this.generation = new CyclicBarrier.Generation();
        if (parties <= 0) {
            throw new IllegalArgumentException();
        } else {
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    }

    public CyclicBarrier(int parties) {
        this(parties, (Runnable)null);
    }

parties 表示闸门的临界值, barrierAction 表示闸门开启之前要调用的方法,也就是一个监听的作用,这里没啥好说的。

来看看几个无关痛痒的方法:

    private void nextGeneration() {
        this.trip.signalAll();
        this.count = this.parties;
        this.generation = new CyclicBarrier.Generation();
    }

    private void breakBarrier() {
        this.generation.broken = true;
        this.count = this.parties;
        this.trip.signalAll();
    }

nextGeneration 为唤醒当前所有等待线程,将计数器复原,同时创建新的 Generation 对象,可以理解为重新开始。

breakBarrier 为开闸的方法,首先将 generation.broken 设置为 true,然后复原计数器,在唤醒所有等待线程 。

接下来看看等待方法:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return this.dowait(false, 0L);
        } catch (TimeoutException var2) {
            throw new Error(var2);
        }
    }

    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
        return this.dowait(true, unit.toNanos(timeout));
    }

可以看到最终都是来到 dowait 中:

    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        ReentrantLock lock = this.lock;
        lock.lock();

        byte var9;
        try {
            CyclicBarrier.Generation g = this.generation;
            if (g.broken) {
                throw new BrokenBarrierException();
            }

            if (Thread.interrupted()) {
                this.breakBarrier();
                throw new InterruptedException();
            }

            int index = --this.count;
            if (index != 0) {
                do {
                    try {
                        if (!timed) {
                            this.trip.await();
                        } else if (nanos > 0L) {
                            nanos = this.trip.awaitNanos(nanos);
                        }
                    } catch (InterruptedException var19) {
                        if (g == this.generation && !g.broken) {
                            this.breakBarrier();
                            throw var19;
                        }

                        Thread.currentThread().interrupt();
                    }

                    if (g.broken) {
                        throw new BrokenBarrierException();
                    }

                    if (g != this.generation) {
                        int var21 = index;
                        return var21;
                    }
                } while(!timed || nanos > 0L);

                this.breakBarrier();
                throw new TimeoutException();
            }

            boolean ranAction = false;

            try {
                Runnable command = this.barrierCommand;
                if (command != null) {
                    command.run();
                }

                ranAction = true;
                this.nextGeneration();
                var9 = 0;
            } finally {
                if (!ranAction) {
                    this.breakBarrier();
                }

            }
        } finally {
            lock.unlock();
        }

        return var9;
    }

可以看到首先判断计数器,如果还需要等待则是一个监听时间的死循环,然后核心在 this.trip.await(),调用了 condition 的条件,等待唤醒,当然在此之前需要使用 Reentrant 加锁 。

注意到循环中有个地方可以跳出循环:

if (g != this.generation) {
    int var21 = index;
    return var21;
}

当被唤醒后发现 generation 改变,则直接返回。

如果已经不需要等待了(计数器为 0),则会跳过循环继续往下执行,可以看到最终会调用监听 的 runnable 的方法,然后调用 this.nextGeneration();方法换新的闸门,同时,如果 runnable 中抛出异常,则会直接来到 finally,执行 reakBarrier() 方法。

考虑一种情况,条件队列中有若干个条件进行阻塞,而外部手动调用 reset 方法:

    public void reset() {
        ReentrantLock lock = this.lock;
        lock.lock();

        try {
            this.breakBarrier();
            this.nextGeneration();
        } finally {
            lock.unlock();
        }

    }

可以看到这里开闸并切换闸,这势必会导致 Condition 中的线程被唤醒开始抢锁,而此时锁一定被调用 reset 的线程占用,因此等到 reset 执行完毕,来到 unlock 的时候,线程中就会有一个线程获得锁,并从 this.trip.await() 继续执行,因此时 Generation 已经改变,因此会直接返回 (依然会执行 finally 中的 unlock 方法)。