JUC - ScheduledThreadPoolExecutor.md
JUC 系列之 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 实时调度
ScheduledThreadPoolExecutor 派生于 ThreadPoolExecutor,同时实现了 ScheduledExecutorService 接口,主要实现了任务的实时调度。
方法 | 说明 |
---|---|
schedule | 从当前时间开始多少时间后执行 |
scheduleAtFixedRate | 从当前时间开始多少时间后执行,之后按另一个时间周期性执行,周期时间从任务开始开始算,不受到任务执行时间影响 |
scheduleWithFixedDelay | 从当前时间开始多少时间后执行,之后按另一个时间周期性执行,周期时间从任务结束开始算,受到任务执行时间影响 |
RunnableScheduledTask
这是实时调度里一个很重要的 Future 的派生类:
主要是派生于 FutureTask 类,其他都为接口限制,先来到构造函数:
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Runnable r, V result, long triggerTime,
long sequenceNumber) {
super(r, result);
this.time = triggerTime;
this.period = 0;
this.sequenceNumber = sequenceNumber;
}
/**
* Creates a periodic action with given nanoTime-based initial
* trigger time and period.
*/
ScheduledFutureTask(Runnable r, V result, long triggerTime,
long period, long sequenceNumber) {
super(r, result);
this.time = triggerTime;
this.period = period;
this.sequenceNumber = sequenceNumber;
}
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Callable<V> callable, long triggerTime,
long sequenceNumber) {
super(callable);
this.time = triggerTime;
this.period = 0;
this.sequenceNumber = sequenceNumber;
}
可以看到除了 FutureTask 需要的构造参数,还增加了几个参数,如下:
名称 | 说明 |
---|---|
time | 任务最早可执行时间点 |
period | 正值表示固定时间周期执行。 负值表示固定延迟周期执行。 0表示非重复任务。 |
sequenceNumber | 递增的 index,当时间相同时按该标志排序,一般使用原子计数器实现 |
有一些辅助方法:
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), NANOSECONDS);
}
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
/**
* Returns {@code true} if this is a periodic (not a one-shot) action.
*
* @return {@code true} if periodic
*/
public boolean isPeriodic() {
return period != 0;
}
/**
* Sets the next time to run for a periodic task.
*/
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
- getDelay 获取延时时间
- compareTo 为了排序,按时间排序,时间相同按 sequenceNumber 排序
- isPeriodic 判断是否是周期性任务
- setNextRunTime 设置下次运行时间(周期性任务才需要)
来看看 run 方法:
public void run() {
if (!canRunInCurrentRunState(this))
cancel(false);
else if (!isPeriodic())
super.run();
else if (super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
非周期任务直接调用父类的 run 方法,否则则调用父类的 runAndReset
方法,执行的同时重设各种 flags ,设置下次运行时间,调用 reExecutePeriodic 重新执行 。
ScheduledThreadPoolExecutor 调度实现
为了探究它是如何实现实时调度的,我们从实时任务提交入手,以 schedule(Callable, long ,TimeUnit)
方法为例:
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
delayedExecute(t);
return t;
}
主要为 实例化一个 ScheduledFutureTask
类,然后调用 decorateTask
方法装饰,最后调用 delayedExecute(t);
方法将任务提交。
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
secorateTask
并没有实现任何功能,这是一个钩子提供给子类去重写的,因此我们直接来到 delayedExecute 方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (!canRunInCurrentRunState(task) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
判断当前能否接收,能的话就调用 super.getQueue().add(task);
将其加入任务队列中,否则就拒绝 。
并且如果成功加入了任务队列,需要调用 ensurePrestart();
方法
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
该方法会添加一个 Worker 来执行,因为实时调度系统需要保证实时性,因此当线程数量等于核心线程数量时,如果有新的任务提交,依然会创建新线程,只要小于最大线程数量 。
任务队列 - DelayedWorkQueue
来看看 ScheduledTrheadPoolExecutor 使用的队列,来到构造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
可以看到调用了父类的构造方法中使用了 DelayedWorkQueue
该数据结构为 ScheduledThreadPoolExecutor 的内部类,这里使用了堆来实现一个优先队列,排序规则为刚刚 Task 重写 的 compareTo 方法 。
其中的 shiftDown
与 shiftUp
为堆的上浮和下沉操作,这里不做说明,来到出队和入队的方法:
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
首先使用了一个 ReentrantLock
保证同步操作,然后就是堆的入堆操作,其中 grow
用于扩展数组,注意中间的:
if (queue[0] == e) {
leader = null;
available.signal();
}
当入堆后,当前元素为队首,则调用一下该条件队列的 signal 操作 。唤醒所有等空堆的线程。
之后是出堆方法 :
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
死循环,还是可重入锁,然后判断堆首元素是否到达执行时间,如果没到,则调用 available.awaitNanos(delay)
等待队列中的元素并让出锁,这里 delay 为堆首元素的等待时间,当时间到之后,线程会重新竞争锁,此时堆首元素一般为可以直接执行的任务,此时该线程会将 leader 设置为 null,然后进入下一次循环 。
这里 leader 表示当前等待队列中的首线程,这里主要是为了当多个线程等待同一个堆首元素,休眠同样时间后醒来,前一个线程将元素出队后该线程出现异常状态,这里当发现这种情况后需要重新等待队中元素:
if (leader != null)
available.await();
这里调用的 finishPoll
为调用出堆逻辑,将结点与尾结点交换后对换上来的结点进行下沉操作 。
至此,该队列可以做到调用 take 之后,会一直阻塞等到第一个符合条件的元素,元素需要按时间排序,靠堆实现,然后堆头元素需要标记的时间 <= 0 才有可能出堆,而出堆后会来到 ScheduledFutureTask 进行处理,如果是循环的会再次提交 。