JUC - ThreadPoolExecutor.md
JUC 系列之 ThreadPoolExecutor
ThreadPoolExecutor 使用
该类有四个构造方法:
来看看第四个最复杂的:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这里给出含义:
序号 | 名称 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程数量 |
2 | maxmumPoolSize | int | 最大线程数量 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 时间单位 |
5 | workQueue | BlockingQueue<Runnable> | 线程等待队列(存放任务) |
6 | threadFactory | ThreadFactory | 线程创建工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
当一个任务提交到线程池,线程池会根据当前状态决定任务的运行线程:
- 若当前少于 corePoolSize 数量的线程正在运行,则一定会新建线程运行
- 当多于 corePoolSize 少于 maxmumPoolSize 数量的线程正在运行,则仅当队列已满时才会新建线程(任务队列已经放不下了,必须执行)
- 当多于 maxmunPoolSize 数量的线程正在运行,且任务队列已满,则会拒绝该任务,最终处理交给拒绝策略
此外,可以调用 prestartCoreThread() 预创建一个空闲任务核心线程或调用 prestartAllCoreThreads() 预创建 corePoolSize 个核心线程。
当需要创建要给新线程,需要调用创建工厂的 newThread 方法,在这里可以指定线程的名称,线程组,优先级等信息。
如果线程池当前拥有超过 corePoolSize 的线程,那么多余的线程在空闲时间超过 keepAliveTime 时会被终止。可以调用 allowCoreThreadTimeOut(boolean) 指定是否需要在超时之后将核心线程也终止。
当提交的任务无法被马上执行后,任务会加入任务队列,这里可以使用任何 BlockingQueue 的子类,当然有几个默认队列在之后会介绍。
有两种情况下任务会被拒绝:
- 线程池已经关闭
- 线程数量已经到达 maxmumPoolSize 并且没有空闲,同时任务队列已满
当一个任务被拒绝,会调用 RejectedExecutionHandler 的 rejectedExecution 方法,我们可以自定义自己的拒绝策略,也可以使用默认的几种,在之后将会介绍 。
当我们构造一个 ThreadPoolExecutor 之后,我们可以调用 execute(Runable)
提交任务
ThreadPoolExecutor 默认 实现
JUC 中有 ThreadPoolExecutor 的一些默认实现,在 Executors 类中,来分别介绍:
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- corePoolSize 于 maximumPoolSize 相等,所有线程都是核心线程
- 使用 LinkedBlockingQueue 作为任务队列,其空间最大为
Integer.MAX_VALUE
,几乎在占满之前内存已经溢出 。 - 任务执行无序
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 核心线程为 0 ,即所有线程都会受到 KeepAliveTime 的限制
- maximunPoolSize 为 Integer.MAX_VALUE,几乎为无限制
- KeepAliveTime 为 60 s
- 使用 SynchronousQueue 同步队列作为任务队列,入队后需要出队后才可继续入队,因为使用该线程池几乎不会有任务需要在队列等待(线程最大值为 Int 最大值),因此直接使用同步队列
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
本质上是一个 newFixedThreadPool(1),但其使用了 FinalizableDelegatedExecutorService 类包装,保证其线程数量只能为 1 不能修改
WorkStealingPool
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
其使用的是 ForkJoinPool 将在之后介绍
ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
其使用的是 ScheduledThreadPoolExecutor ,为 ThreadPoolExexutor 的一个派生类,将在之后介绍。
自定义 ThreadPoolExecutor
自定义 ThreadPoolExecutor 需要确定要使用的 任务队列,线程工厂与拒绝策略,这里分别介绍
任务队列总体来说有三种策略
- 直接握手
一个很好的使用该策略的任务队列是 SynchronousQueue
,相当于没有任务队列,所有任务都会尝试新建线程,如果线程数量大于 maximunPoolSize 则会直接拒绝。
- 无界队列
一个很好的使用该策略的任务队列是 LinkedBlockingQueue
(其界限为 Integer.MAX_VALUE,约等于无界),使用该队列时,maximunPoolSize 将失去作用,因为当线程数量大于 corePoolSize 时必须要队列满才会创建新线程,因此对于无界队列,相当于线程最大大小限制为 corePoolSize
- 有界队列
一个很好的使用该策略的任务队列是 ArrayBlockingQueue
,使用该队列配合 maximunPoolSize 能够有助于提高效率,不过其值比较难以控制,需要相互权衡。
可以实现自己的拒绝策略,也可以是默认实现的几种:
- AbortPolicy:抛出 RejectedExecutionException 异常,当不指定时默认
- CallerRunsPolicy:如果线程池还没关闭,则直接运行失败的任务(不放入线程池),如果线程池已经关闭,则丢弃
- DiscardPolicy:直接丢弃
- DiscardOldestPolicy:丢弃队头任务,并再次尝试
此外还可以实现自己的策略:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
对于线程工厂,可以使用默认工厂:
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
当需要自定义时,直接实现 ThreadFactory
以及 newThread 方法即可 。
ThreadPoolExecutor 状态
ThreadPoolExecutor 的状态使用一个 AtomicInteger 的 ctl 变量表示,使用了状态压缩,其中高 3 位表示 5 种状态,低 29 位表示线程数量,相关变量和方法如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中五种状态如下:
状态 | 说明 |
---|---|
RUNNING | 运行,可以处理新任务与任务队列中的任务 |
SHUTDOWN | 关闭,不接受新任务,但执行队列中任务 |
STOP | 停止,不接受新任务,不执行队列中的任务,中止当前任务 |
TIDYING | 整理,所有任务结束,workerCount = 0,将调用 terminated 方法 |
TERMINATED | 结束,terminated 方法完成 |
AtomicInteger 使用 CAS 保证线程安全。
ThreadPoolExecutor 工作
一个任务的提交是从调用 execute 方法开始的,因此先从这里开始:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
主要是根据当前状态判断,
- 工作线程少于核心线程数:调用 addWorker 方法添加新的线程,成功后返回,否则更新一下当前 ctl ,然后进行下一步判断 。
- 如果当前处于运行态,则尝试将任务添加进任务队列并且添加成功:再次刷新当前 ctl,然后重新判断如果当前不处于运行态则尝试从任务队列删除,成功后则拒绝该任务,否则如果当前工作线程数量为 0 则调用
addWorker(null, false);
启动一个核心线程 - 否则(不处于运行态,或添加任务队列失败):则调用 ·
addWorker(command, false)
尝试添加新的非核心线程,如果失败则拒绝
之前我们说当线程数量达到核心线程数时,只有在任务队列满的时候才会创建新线程,就是因为这里的判断。
从以上代码,我们可以看到创建新的线程需要调用 addWorker 方法,而参数表示是否以核心线程添加,因此来到 addWorker 方法:
Worker 是线程的一种抽象,一个 Worker 对应一个线程,因此只有该任务需要新建线程执行或新建核心线程的时候需要调用 addWorker 方法,通常情况下 Worker 会自己获取任务队列第一个任务执行。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
代码挺长,主要分为两部分:
第一部分为两重循环,通过自旋 CAS 操作等判断当前是否可以添加 任务,可以则调用 break retry
跳出外层循环,不可以则直接 return false
。
第二部分为构建 Worker 对象并加入工作队列,同时开始工作,这里使用了 mainLock 来确保同步 。
先来看看 Worker 对象:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
@SuppressWarnings("serial") // Unlikely to be serializable
final Thread thread;
/** Initial task to run. Possibly null. */
@SuppressWarnings("serial") // Not statically typed as Serializable
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
可以看到这里继承了 AQS 实现同步操作,同时实现了 Runnable 接口,来看看 run 方法:
public void run() {
runWorker(this);
}
最终会来到 runWorker 方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
可以看到主体是一个死循环,不断调用 getTask 获取任务并执行,期间进行各种状态判断与钩子的调用 。同时还使用了同步锁保护其中变量(Worker 自己实现了 AQS 锁)。
可以再来看看 getTask
方法:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
还是自旋 CAS 判断状态,然后从任务队列中获取一个任务 。注意这里
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
如果当前线程数量大于核心线程数或者需要控制核心线程空闲时间,则调用 workQueue.poll 带超时参数的方法,超时时间为 keepAliveTime ,否则直接调用 workQueue.take();
ThreadPoolExecutor 与 FutureTask
除了 execute 方法可以提交任务,我们还可以调用 submit 方法,这是 抽象类 AbstractExecutorService
提供的方法:
而 ThreadPoolExecutor 继承于该抽象类,这里来看看其 submit 实现,以 Callable 为例:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
最终还是调用 execute 方法,不过使用了 Future 包装了一下,来到 newTaskFor 方法:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
熟悉的 FutureTask 类,之前以及介绍过了。