Java线程池及原理

2021/12/13

Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。所以线程池逃不开两个东西,队列跟线程。

线程池使用

JAVA中创建线程池主要有两类方法,一类是通过Executors工厂类提供的方法,该类提供了4种不同的线程池可供使用。另一类是通过ThreadPoolExecutor实现类进行自定义创建,而Exectores工厂类最终也是创建ThreadPoolExecutor,所以先看看ThreadPoolExecutor:

利用ThreadPoolExecutor创建线程池

ThreadPoolExecutor 继承与 AbstractExecutorService,ExecutorService其实就是Executor体系里最核心的玩意儿,而ThreadPoolExecutor本身可以直观上看做线程池的本体。直接看下ThreadPoolExecutor构造函数

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

ThreadPoolExecutor有7个参数,参数之间可能会有影响:

  • corePoolSize核心线程池大小,默认情况下,即使它们处于idle状态也不销毁,除非用户设置了allowCoreThreadTimeOut,设置后,核心线程允许超时,超时时间就是keepAliveTime*unit,在这种情况下,核心线程数量可以缩减,甚至为0,一般来说核心线程所有自己的标记 worker创建的时候,会设置。

        public void allowCoreThreadTimeOut(boolean value) {
          if (value && keepAliveTime <= 0)
              throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
          if (value != allowCoreThreadTimeOut) {
              allowCoreThreadTimeOut = value;
              if (value)
                  interruptIdleWorkers();
          }
      }
    
  • maximumPoolSize线程池中最大的存活线程数,对于超出corePoolSize部分的线程,如果处于空闲状态,都会超时机制,超时时间keepAliveTime*unit。
  • keepAliveTime unit 共同定义超时时间
  • workQueue【BlockingQueue】作用就是让暂时无法获取线程的任务进入队列,等待执行,当调用execute【最终调用】方法时,如果线程池中没有空闲可用线程,任务就会入队,采用的队列不同,发生的效果也不同
	ArrayBlockingQueue	一个由数组结构组成的有界阻塞队列。
	LinkedBlockingQueue	一个由链表结构组成的可选有界阻塞队列。
	SynchronousQueue	一个不存储元素的阻塞队列,**即亲手直接提交给线程不保持它们**。 必须亲手直接给,要么就等、要么不给
	PriorityBlockingQueue	一个支持优先级排序的无界阻塞队列。
	DelayQueue	一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
	LinkedTransferQueue	一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
  • threadFactory 【ThreadFactory】线程工厂类,一般都是默认Executors.defaultThreadFactory()
  • handler【RejectedExecutionHandler】 这个参数是用来执行拒绝策略的,当提交任务时既没有空闲线程,任务队列也满了,就会执行拒绝操作,比如ArrayBlockingQueue、或者设定了容量的LinkedBlockingQueue,PriorityBlockingQueue是无限队列,不会发生拒绝。

利用Executors工厂创建的线程池有如下三种

  • FixedThreadPool:线程数固定的线程池;
  • CachedThreadPool:线程数根据任务动态调整的线程池; 理论上无限大
  • SingleThreadExecutor:仅单线程执行的线程池。

上面三种内部用的都是ThreadPoolExecutor

/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue, and uses the provided ThreadFactory to
 * create a new thread when needed. Unlike the otherwise
 * equivalent {@code newFixedThreadPool(1, threadFactory)} the
 * returned executor is guaranteed not to be reconfigurable to use
 * additional threads.
 *
 * @param threadFactory the factory to use when creating new
 * threads
 *
 * @return the newly created single-threaded Executor
 * @throws NullPointerException if threadFactory is null
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

不过用FinalizableDelegatedExecutorService封装了一下

static class FinalizableDelegatedExecutorService
    extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    protected void finalize() {
        super.shutdown();
    }
}

应该是JVM为了防止浪费,在GC前利用 finalize将线程池关闭,回收资源。

 /**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newCachedThreadPool有些特殊,它构造的ThreadPoolExecutor采用的是SynchronousQueue,比较适合执行高频、轻量级任务,它不会存储任务队列,如果有可复用的线程可用就直接用,否则创建新线程使用,不存在入队操作。

ThreadPoolExecutor运行原理

构造ThreadPollExecutor之后就处于Runing状态,可以提交任务了,核心函数是execute,虽然也有submit等操作,但只是利用FutureTask封装了一下,但核心还是封装成execute

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

execute 的执行与线程池的扩展

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
     <!--获取原子操作类Int-->
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

有一个很关键的变量,ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 一开始就是RUNING状态


/**
 * The main pool control state, ctl, is an atomic integer packing
 * two conceptual fields
 *   workerCount, indicating the effective number of threads
 *   runState,    indicating whether running, shutting down etc
 *
 * In order to pack them into one int, we limit workerCount to
 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
 * billion) otherwise representable. If this is ever an issue in
 * the future, the variable can be changed to be an AtomicLong,
 * and the shift/mask constants below adjusted. But until the need
 * arises, this code is a bit faster and simpler using an int.
 *
 * The workerCount is the number of workers that have been
 * permitted to start and not permitted to stop.  The value may be
 * transiently different from the actual number of live threads,
 * for example when a ThreadFactory fails to create a thread when
 * asked, and when exiting threads are still performing
 * bookkeeping before terminating. The user-visible pool size is
 * reported as the current size of the workers set.
 *
 * The runState provides the main lifecycle control, taking on values:
 *
 *   RUNNING:  Accept new tasks and process queued tasks
 *   SHUTDOWN: Don't accept new tasks, but process queued tasks
 *   STOP:     Don't accept new tasks, don't process queued tasks,
 *             and interrupt in-progress tasks
 *   TIDYING:  All tasks have terminated, workerCount is zero,
 *             the thread transitioning to state TIDYING
 *             will run the terminated() hook method
 *   TERMINATED: terminated() has completed

clt内含两个概念:workerCount:有效的线程数 runState:线程池的五种状态,Running、Shutdown、Stop、Tidying、Terminate, runState用int的高3位来表示,workCount用低29位标识,有效线程数最多为2^29-1。

execute第一步

如果核心线程数还未达到,则直接尝试添加新线程,添加新线程的时候,不涉及使用ThreadPoolExecutor中的Queue,不过后续第二任务可能就涉及

/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
                <!--这里可能有多个线程竞争,都想修改ctl, 其次也可能有线程终止->
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl 
            if (runStateOf(c) != rs)  //线程池状态变了
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
	<!--真正开始新建任务 这里是worker-->
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker继承了AbstractQueuedSynchronizer,同时实现了Runnable,并且自身是个Loop

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

从构造函数可以出,Worker利用ThreadFactory直接新建了一个Thread

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

run调用的runWorker是外部函数,这里有个需要注意的点是 每个Worker在执行前都加锁,已有效中断或者不让中断

2Before running any task, the lock is acquired to prevent other pool interrupts while the task is executing, and then we ensure that unless pool is stopping, this thread does not have its interrupt set.

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    <!--第一个任务肯定不为null,后续通过getTask获取-->
        while (task != null || (task = getTask()) != null) {
        <!--这里是防止执行任务的worker不被中断回收-->
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
    <!--没有任务执行的时候,就需要尝试主动退出-->
        processWorkerExit(w, completedAbruptly);
    }
}

这个锁可以防止正在执行的Worker被回收,比如调用shutdown,

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

shutdown不会让线程池立即终止,而是会等待任务执行完,执行任务的Worker当然不能中断,那么这个锁就会发挥一定作用

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            <!-- 未被中断,并且获取锁成功,不想中断正在执行的任务-->
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

除了以第一个Task,其余的Task通过getTask获取

 private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask就会用到之前的BlockQueue了,根据是否超出 allowCoreThreadTimeOut与 corePoolSize可以选择用阻塞等待还是超市等待,阻塞就用take,非阻塞【可设定超时时间】就用poll,到这里整个线程池就算运转起来了。回到exectue,这里才走完第一步,来看看第二步。

exectue第二步

如果corePoolSize的限制已经达到,核心线程的数量已经足够了,后续如何?

  • 2If a task can be successfully queued, then we still need
  • to double-check whether we should have added a thread
  • (because existing ones died since last checking) or that
  • the pool shut down since entry into this method. So we
  • recheck state and if necessary roll back the enqueuing if
  • stopped, or start a new thread if there are none.
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }

isRunning(c)再次判断线程池是否还在运行中,然后利用workQueue.offer判断任务是否插入成功,之后再次检查,看看线程池是否关闭,如果关闭,重新移除任务,并执行reject操作,否则检查线程数是否为0,这个意思是可能压根就没设定核心线程数,或者核心线程也正好缩减到0,如果是这样的话,可以立即开启一个新的Worker线程,这一步的主体是workQueue.offer,添加任务,注意是先添加,而不是先扩展线程池,因为corePoolSize本身就隐含了权衡的意义,超出只是暂时的,吞吐量可能不会那么高。

exectue第三步:BlockQueue队列满了,扩展线程池

  1. If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task.
   else if (!addWorker(command, false))
        reject(command);

只有在BlockQueue队列满的情况下才会扩展线程池,这个顺序很重要,像PriorityBlockingQueue、或者设定容量很大的LinkBlockQueue、ArrayBlockQueue,可能也不糊走到这步。这与第一步的区别是,addworker的第二个参数是false,在判断是否可以继续扩展线程池的时候用的就是maximumPoolSize数值,其他不变

                 wc >= (core ? corePoolSize : maximumPoolSize))

如果这一步扩展线程池失败,那么只能走reject逻辑。

线程池的缩减

开线程池的缩减如无主动调用,线程池会自动调整,还记得上面Worker的getTask吗

private Runnable getTask() {
			...
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();

在BlockQueue利用poll(keepAliveTime, TimeUnit.NANOSECONDS) 失败的情况下,其实就可以看做是超时了,就可以对线程进行清理了。也就是runWorker会走进processWorkerExit分支进行清理

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
        ..
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

processWorkerExit会将worker进行回收,其实runWorker执行结束,这个loop也算结束了,线程会自动终结的

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    <!--works hanshmap中移除-->
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
	<!--是否需要终结线程池 一般不会-->
    tryTerminate();
 
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        <!--如果不小心,太少了,则重新开启-->
        addWorker(null, false);
    }
}

tryTerminate不是用来终结线程的,而是看看是不是需要关闭线程池,

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

比如,调用了shutdown,但是队列还不是空,则不终结,如果为空了,也没什么等待的,可以尝试终结线程池。

线程池的关闭

线程池状态的流转从RUNNING开始,到TERMINATED结束,

流转

  • RUNNING:初始化状态是RUNNING,能够接收新任务,以及对已添加的任务进行处理。
  • SHUTDOWN 调用shutdown()后,线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
  • STOP 调用shutdownNow()后 ,不接收新任务,不处理已添加的任务,并且会尝试中断正在处理的任务。
  • TIDYING 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态,同时会执行钩子函数terminated()
  • TERMINATED terminated()执行完毕,线程池彻底终止,就变成TERMINATED状态

shutdown()方法与shutdownNow的区别

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        <!--中断没用的-->
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

    private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        <!--尝试中断所有-->
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    for (Worker w : workers)
        w.interruptIfStarted();
} finally {
    mainLock.unlock();
} }
  • 首先体现在是否让等待队列的任务执行完毕
  • 其次shutdownNow会尝试终止正在执行的任务,但不一定成功。
  • 最后,两者调用后一个变成SHUTDOWN状态,一个变成STOP状态

submit与callable

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

ThreadPoolExecutor的submit函数会先封装一个Future类,实现是FutureTask,之后利用execute提交,最后返回这个Task的引用,调用者,可以利用Future的get函数,阻塞等待结果,一般而言FutureTask本身内含阻塞操作,

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

awaitDone

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

线程构建WaitNode,并将其添加到一个等待队列,然后利用LockSupport挂起自己,等待唤醒,在哪里唤醒,线程池执行任务会调用任务的run函数

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

run函数在执行完,call,并获结果后,会利用set函数设置result,在这个函数中会调用finishCompletion唤起等地的线程

    protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

    private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

唤起操作很直接,直接调用LockSupport的LockSupport.unpark函数。

参考文档

https://www.cnblogs.com/pcheng/p/13540619.html

Search

    Table of Contents