JUC - ThreadPoolExecutor.md

903

JUC 系列之 ThreadPoolExecutor

ThreadPoolExecutor 使用

该类有四个构造方法:

image20210728210814078.png

来看看第四个最复杂的:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

这里给出含义:

序号名称类型含义
1corePoolSizeint核心线程数量
2maxmumPoolSizeint最大线程数量
3keepAliveTimelong线程最大空闲时间
4unitTimeUnit时间单位
5workQueueBlockingQueue<Runnable>线程等待队列(存放任务)
6threadFactoryThreadFactory线程创建工厂
7handlerRejectedExecutionHandler拒绝策略

当一个任务提交到线程池,线程池会根据当前状态决定任务的运行线程:

  • 若当前少于 corePoolSize 数量的线程正在运行,则一定会新建线程运行
  • 当多于 corePoolSize 少于 maxmumPoolSize 数量的线程正在运行,则仅当队列已满时才会新建线程(任务队列已经放不下了,必须执行)
  • 当多于 maxmunPoolSize 数量的线程正在运行,且任务队列已满,则会拒绝该任务,最终处理交给拒绝策略

此外,可以调用 prestartCoreThread() 预创建一个空闲任务核心线程或调用 prestartAllCoreThreads() 预创建 corePoolSize 个核心线程。

当需要创建要给新线程,需要调用创建工厂的 newThread 方法,在这里可以指定线程的名称,线程组,优先级等信息。

如果线程池当前拥有超过 corePoolSize 的线程,那么多余的线程在空闲时间超过 keepAliveTime 时会被终止。可以调用 allowCoreThreadTimeOut(boolean) 指定是否需要在超时之后将核心线程也终止。

当提交的任务无法被马上执行后,任务会加入任务队列,这里可以使用任何 BlockingQueue 的子类,当然有几个默认队列在之后会介绍。

有两种情况下任务会被拒绝:

  • 线程池已经关闭
  • 线程数量已经到达 maxmumPoolSize 并且没有空闲,同时任务队列已满

当一个任务被拒绝,会调用 RejectedExecutionHandler 的 rejectedExecution 方法,我们可以自定义自己的拒绝策略,也可以使用默认的几种,在之后将会介绍 。

当我们构造一个 ThreadPoolExecutor 之后,我们可以调用 execute(Runable) 提交任务

ThreadPoolExecutor 默认 实现

JUC 中有 ThreadPoolExecutor 的一些默认实现,在 Executors 类中,来分别介绍:

FixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • corePoolSize 于 maximumPoolSize 相等,所有线程都是核心线程
  • 使用 LinkedBlockingQueue 作为任务队列,其空间最大为 Integer.MAX_VALUE,几乎在占满之前内存已经溢出 。
  • 任务执行无序

CachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • 核心线程为 0 ,即所有线程都会受到 KeepAliveTime 的限制
  • maximunPoolSize 为 Integer.MAX_VALUE,几乎为无限制
  • KeepAliveTime 为 60 s
  • 使用 SynchronousQueue 同步队列作为任务队列,入队后需要出队后才可继续入队,因为使用该线程池几乎不会有任务需要在队列等待(线程最大值为 Int 最大值),因此直接使用同步队列

SingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

本质上是一个 newFixedThreadPool(1),但其使用了 FinalizableDelegatedExecutorService 类包装,保证其线程数量只能为 1 不能修改

WorkStealingPool

   public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

其使用的是 ForkJoinPool 将在之后介绍

ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

其使用的是 ScheduledThreadPoolExecutor ,为 ThreadPoolExexutor 的一个派生类,将在之后介绍。

自定义 ThreadPoolExecutor

自定义 ThreadPoolExecutor 需要确定要使用的 任务队列,线程工厂与拒绝策略,这里分别介绍

任务队列总体来说有三种策略

  1. 直接握手

一个很好的使用该策略的任务队列是 SynchronousQueue,相当于没有任务队列,所有任务都会尝试新建线程,如果线程数量大于 maximunPoolSize 则会直接拒绝。

  1. 无界队列

一个很好的使用该策略的任务队列是 LinkedBlockingQueue(其界限为 Integer.MAX_VALUE,约等于无界),使用该队列时,maximunPoolSize 将失去作用,因为当线程数量大于 corePoolSize 时必须要队列满才会创建新线程,因此对于无界队列,相当于线程最大大小限制为 corePoolSize

  1. 有界队列

一个很好的使用该策略的任务队列是 ArrayBlockingQueue,使用该队列配合 maximunPoolSize 能够有助于提高效率,不过其值比较难以控制,需要相互权衡。

可以实现自己的拒绝策略,也可以是默认实现的几种:

  • AbortPolicy:抛出 RejectedExecutionException 异常,当不指定时默认
  • CallerRunsPolicy:如果线程池还没关闭,则直接运行失败的任务(不放入线程池),如果线程池已经关闭,则丢弃
  • DiscardPolicy:直接丢弃
  • DiscardOldestPolicy:丢弃队头任务,并再次尝试

此外还可以实现自己的策略:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

对于线程工厂,可以使用默认工厂:

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

当需要自定义时,直接实现 ThreadFactory 以及 newThread 方法即可 。

ThreadPoolExecutor 状态

ThreadPoolExecutor 的状态使用一个 AtomicInteger 的 ctl 变量表示,使用了状态压缩,其中高 3 位表示 5 种状态,低 29 位表示线程数量,相关变量和方法如下:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

其中五种状态如下:

状态说明
RUNNING运行,可以处理新任务与任务队列中的任务
SHUTDOWN关闭,不接受新任务,但执行队列中任务
STOP停止,不接受新任务,不执行队列中的任务,中止当前任务
TIDYING整理,所有任务结束,workerCount = 0,将调用 terminated 方法
TERMINATED结束,terminated 方法完成

AtomicInteger 使用 CAS 保证线程安全。

ThreadPoolExecutor 工作

一个任务的提交是从调用 execute 方法开始的,因此先从这里开始:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

主要是根据当前状态判断,

  • 工作线程少于核心线程数:调用 addWorker 方法添加新的线程,成功后返回,否则更新一下当前 ctl ,然后进行下一步判断 。
  • 如果当前处于运行态,则尝试将任务添加进任务队列并且添加成功:再次刷新当前 ctl,然后重新判断如果当前不处于运行态则尝试从任务队列删除,成功后则拒绝该任务,否则如果当前工作线程数量为 0 则调用 addWorker(null, false); 启动一个核心线程
  • 否则(不处于运行态,或添加任务队列失败):则调用 ·addWorker(command, false) 尝试添加新的非核心线程,如果失败则拒绝

之前我们说当线程数量达到核心线程数时,只有在任务队列满的时候才会创建新线程,就是因为这里的判断。

从以上代码,我们可以看到创建新的线程需要调用 addWorker 方法,而参数表示是否以核心线程添加,因此来到 addWorker 方法:

Worker 是线程的一种抽象,一个 Worker 对应一个线程,因此只有该任务需要新建线程执行或新建核心线程的时候需要调用 addWorker 方法,通常情况下 Worker 会自己获取任务队列第一个任务执行。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

代码挺长,主要分为两部分:

第一部分为两重循环,通过自旋 CAS 操作等判断当前是否可以添加 任务,可以则调用 break retry 跳出外层循环,不可以则直接 return false

第二部分为构建 Worker 对象并加入工作队列,同时开始工作,这里使用了 mainLock 来确保同步 。

先来看看 Worker 对象:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        @SuppressWarnings("serial") // Unlikely to be serializable
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        @SuppressWarnings("serial") // Not statically typed as Serializable
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        // TODO: switch to AbstractQueuedLongSynchronizer and move
        // completedTasks into the lock word.

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

可以看到这里继承了 AQS 实现同步操作,同时实现了 Runnable 接口,来看看 run 方法:

public void run() {
    runWorker(this);
}

最终会来到 runWorker 方法:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

可以看到主体是一个死循环,不断调用 getTask 获取任务并执行,期间进行各种状态判断与钩子的调用 。同时还使用了同步锁保护其中变量(Worker 自己实现了 AQS 锁)。

可以再来看看 getTask 方法:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

还是自旋 CAS 判断状态,然后从任务队列中获取一个任务 。注意这里

Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();

如果当前线程数量大于核心线程数或者需要控制核心线程空闲时间,则调用 workQueue.poll 带超时参数的方法,超时时间为 keepAliveTime ,否则直接调用 workQueue.take();

ThreadPoolExecutor 与 FutureTask

除了 execute 方法可以提交任务,我们还可以调用 submit 方法,这是 抽象类 AbstractExecutorService 提供的方法:

image20210729110819443.png

而 ThreadPoolExecutor 继承于该抽象类,这里来看看其 submit 实现,以 Callable 为例:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

最终还是调用 execute 方法,不过使用了 Future 包装了一下,来到 newTaskFor 方法:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

熟悉的 FutureTask 类,之前以及介绍过了。