JUC - ScheduledThreadPoolExecutor.md

853

JUC 系列之 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 实时调度

ScheduledThreadPoolExecutor 派生于 ThreadPoolExecutor,同时实现了 ScheduledExecutorService 接口,主要实现了任务的实时调度。

image20210729110002203.png

image20210729110125985.png

方法说明
schedule从当前时间开始多少时间后执行
scheduleAtFixedRate从当前时间开始多少时间后执行,之后按另一个时间周期性执行,周期时间从任务开始开始算,不受到任务执行时间影响
scheduleWithFixedDelay从当前时间开始多少时间后执行,之后按另一个时间周期性执行,周期时间从任务结束开始算,受到任务执行时间影响

RunnableScheduledTask

这是实时调度里一个很重要的 Future 的派生类:

image20210729111644067.png

主要是派生于 FutureTask 类,其他都为接口限制,先来到构造函数:

/**
         * Creates a one-shot action with given nanoTime-based trigger time.
         */
        ScheduledFutureTask(Runnable r, V result, long triggerTime,
                            long sequenceNumber) {
            super(r, result);
            this.time = triggerTime;
            this.period = 0;
            this.sequenceNumber = sequenceNumber;
        }

        /**
         * Creates a periodic action with given nanoTime-based initial
         * trigger time and period.
         */
        ScheduledFutureTask(Runnable r, V result, long triggerTime,
                            long period, long sequenceNumber) {
            super(r, result);
            this.time = triggerTime;
            this.period = period;
            this.sequenceNumber = sequenceNumber;
        }

        /**
         * Creates a one-shot action with given nanoTime-based trigger time.
         */
        ScheduledFutureTask(Callable<V> callable, long triggerTime,
                            long sequenceNumber) {
            super(callable);
            this.time = triggerTime;
            this.period = 0;
            this.sequenceNumber = sequenceNumber;
        }

可以看到除了 FutureTask 需要的构造参数,还增加了几个参数,如下:

名称说明
time任务最早可执行时间点
period正值表示固定时间周期执行。
负值表示固定延迟周期执行。
0表示非重复任务。
sequenceNumber递增的 index,当时间相同时按该标志排序,一般使用原子计数器实现

有一些辅助方法:

public long getDelay(TimeUnit unit) {
    return unit.convert(time - System.nanoTime(), NANOSECONDS);
}

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

/**
         * Returns {@code true} if this is a periodic (not a one-shot) action.
         *
         * @return {@code true} if periodic
         */
public boolean isPeriodic() {
    return period != 0;
}

/**
         * Sets the next time to run for a periodic task.
         */
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}
  • getDelay 获取延时时间
  • compareTo 为了排序,按时间排序,时间相同按 sequenceNumber 排序
  • isPeriodic 判断是否是周期性任务
  • setNextRunTime 设置下次运行时间(周期性任务才需要)

来看看 run 方法:

public void run() {
    if (!canRunInCurrentRunState(this))
        cancel(false);
    else if (!isPeriodic())
        super.run();
    else if (super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

非周期任务直接调用父类的 run 方法,否则则调用父类的 runAndReset 方法,执行的同时重设各种 flags ,设置下次运行时间,调用 reExecutePeriodic 重新执行 。

ScheduledThreadPoolExecutor 调度实现

为了探究它是如何实现实时调度的,我们从实时任务提交入手,以 schedule(Callable, long ,TimeUnit) 方法为例:

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit),
                                       sequencer.getAndIncrement()));
        delayedExecute(t);
        return t;
    }

主要为 实例化一个 ScheduledFutureTask 类,然后调用 decorateTask 方法装饰,最后调用 delayedExecute(t); 方法将任务提交。


    protected <V> RunnableScheduledFuture<V> decorateTask(
        Runnable runnable, RunnableScheduledFuture<V> task) {
        return task;
    }

    protected <V> RunnableScheduledFuture<V> decorateTask(
        Callable<V> callable, RunnableScheduledFuture<V> task) {
        return task;
    }

secorateTask 并没有实现任何功能,这是一个钩子提供给子类去重写的,因此我们直接来到 delayedExecute 方法:

   private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(task) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

判断当前能否接收,能的话就调用 super.getQueue().add(task); 将其加入任务队列中,否则就拒绝 。

并且如果成功加入了任务队列,需要调用 ensurePrestart(); 方法

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

该方法会添加一个 Worker 来执行,因为实时调度系统需要保证实时性,因此当线程数量等于核心线程数量时,如果有新的任务提交,依然会创建新线程,只要小于最大线程数量 。

任务队列 - DelayedWorkQueue

来看看 ScheduledTrheadPoolExecutor 使用的队列,来到构造方法:

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

可以看到调用了父类的构造方法中使用了 DelayedWorkQueue

image20210729145251053.png

该数据结构为 ScheduledThreadPoolExecutor 的内部类,这里使用了堆来实现一个优先队列,排序规则为刚刚 Task 重写 的 compareTo 方法 。

其中的 shiftDownshiftUp 为堆的上浮和下沉操作,这里不做说明,来到出队和入队的方法:

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);
        }
        if (queue[0] == e) {
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

首先使用了一个 ReentrantLock 保证同步操作,然后就是堆的入堆操作,其中 grow 用于扩展数组,注意中间的:

if (queue[0] == e) {
    leader = null;
    available.signal();
}

当入堆后,当前元素为队首,则调用一下该条件队列的 signal 操作 。唤醒所有等空堆的线程。

之后是出堆方法 :

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0L)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

死循环,还是可重入锁,然后判断堆首元素是否到达执行时间,如果没到,则调用 available.awaitNanos(delay) 等待队列中的元素并让出锁,这里 delay 为堆首元素的等待时间,当时间到之后,线程会重新竞争锁,此时堆首元素一般为可以直接执行的任务,此时该线程会将 leader 设置为 null,然后进入下一次循环 。

这里 leader 表示当前等待队列中的首线程,这里主要是为了当多个线程等待同一个堆首元素,休眠同样时间后醒来,前一个线程将元素出队后该线程出现异常状态,这里当发现这种情况后需要重新等待队中元素:

if (leader != null)
	available.await();

这里调用的 finishPoll 为调用出堆逻辑,将结点与尾结点交换后对换上来的结点进行下沉操作 。

至此,该队列可以做到调用 take 之后,会一直阻塞等到第一个符合条件的元素,元素需要按时间排序,靠堆实现,然后堆头元素需要标记的时间 <= 0 才有可能出堆,而出堆后会来到 ScheduledFutureTask 进行处理,如果是循环的会再次提交 。