Skip to content

线程池的关闭

线程池的关闭方式有两种,一种是调用 shutdown() 方法,另一种是调用 shutdownNow() 方法。

  • shutdown

    调用该方法后,线程池拒绝接受新提交的任务,同时等待线程池里和任务队列中的任务执行完毕再关闭线程。

  • shutdownNow

    调用该方法后,线程池拒绝接受新提交的任务,不再执行任务队列的任务,将线程池任务队列里的任务全部返回。

shutdown

调用该方法后,线程池拒绝接受新提交的任务,同时等待线程池里和任务队列中的任务执行完毕再关闭线程。

java
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
      	//加锁
        mainLock.lock();
        try {
          	//检查权限
            checkShutdownAccess();
          	//设置线程池的状态
            advanceRunState(SHUTDOWN);
          	//中断空闲线程(不能中断正在运行的线程)
            interruptIdleWorkers();
          	//钩子函数,用来清理一些资源
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

过程分析

  1. 加锁。

  2. 检查是否有执行的权限。

  3. 将线程池的状态更改为 SHUTDOWN。在这之后线程池不再接受新任务。

    java
        private void advanceRunState(int targetState) {
            for (;;) {
              	//当前状态
                int c = ctl.get();
              	//若当前状态>=更新状态 =》 break ,不允许线程池更改状态
              	//若当前状态<更新状态;将线程池状态修改为更新状态。比如 RUNNING < SHUTDOWN时,线程池状态会修改为 SHUTDOWN
                if (runStateAtLeast(c, targetState) ||
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                    break;
            }
        }
    		
    		//当前状态,更新状态
        private static boolean runStateAtLeast(int c, int s) {
          	//当前状态>=更新状态
            return c >= s;
        }
  4. 中断空闲线程。

    该方法只会中断空闲线程,不会中断正在执行的线程。

    因为正在执行的线程还需要将线程池和队列中的任务全部执行完,才可以关闭。

    java
        private void interruptIdleWorkers() {
            interruptIdleWorkers(false);
        }
    
        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
              	//遍历工作线程
                for (Worker w : workers) {
                    Thread t = w.thread;
                  	//线程不中断 && 获取线程的锁(正在执行的线程会占用锁,故该方法不能中断正在运行的线程)
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                          	//中断线程
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }

    参考扩展分析 - 2.shutdown方法调用后怎样等待任务队列任务执行完成?

扩展分析

  1. 执行 shutdown 方法后再新增任务会发生什么?

    执行 shutdown 方法后,若还继续向线程池提交任务的话,线程池会检查线程池的状态,线程池状态不是 RUNNING 状态,会执行线程池的拒绝策略(发生在线程池提交任务的方法 execute 方法 )。

    java
    // runState is stored in the high-order bits
         private static final int RUNNING    = -1 << COUNT_BITS;
         private static final int SHUTDOWN   =  0 << COUNT_BITS;
         private static final int STOP       =  1 << COUNT_BITS;
         private static final int TIDYING    =  2 << COUNT_BITS;
         private static final int TERMINATED =  3 << COUNT_BITS;
       
        public void execute(Runnable command) {
             if (command == null)
                 throw new NullPointerException();
             
             int c = ctl.get();
             if (workerCountOf(c) < corePoolSize) {
                 if (addWorker(command, true))
                     return;
                 c = ctl.get();
             }
             //判断线程状态
          		//isRunning(c) = > false
             if (isRunning(c) && workQueue.offer(command)) {
                 int recheck = ctl.get();
                 if (! isRunning(recheck) && remove(command))
                     reject(command);
                 else if (workerCountOf(recheck) == 0)
                     addWorker(null, false);
             }
            //其他状态添加任务失败后,执行拒绝策略。
          	 //addWorker(command, false) => false
             else if (!addWorker(command, false))
               	//执行拒绝策略
                 reject(command);
         }
       
        //判断线程池状态是否为 RUNNING
    	 //return false
        private static boolean isRunning(int c) {
             return c < SHUTDOWN;
        }
       
        private boolean addWorker(Runnable firstTask, boolean core) {
           retry:
           for (;;) {
               int c = ctl.get();
               int rs = runStateOf(c);
       
               // (true && !(true && false && true))
             	// (true && true) => true
             	//return false
               if (rs >= SHUTDOWN &&
                   ! (rs == SHUTDOWN &&
                      firstTask == null &&
                      ! workQueue.isEmpty()))
                   return false;
       
              ......
        }
  2. shutdown 方法调用后怎么等待任务全部完成?

    在调用 shutdown 方法后,线程池对新增的任务执行拒绝策略,但是会等待线程池和任务队列里的任务执行完毕。

    该机制的实现发生在线程池内执行任务的方法 runWordker() 里。

    可以看到 runWorker 方法里有加锁和解锁,对应 shutdown 方法只能中断空闲线程的原因。

    java
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
          	//firstTask 首次运行任务时不为空,之后运行为空。(从任务队列取任务)
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
              	// 获取任务,非首次执行 getTask() 方法获取任务(getTask()==null时任务类也为空)
                while (task != null || (task = getTask()) != null) {
                  	//当前线程加锁
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                          	//调用线程
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                      	//当前线程解锁
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    		//获取任务
        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
              	// 判断线程池状态和任务队列(主要)
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                  	//线程任务置空
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                  	//从任务队列取任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }

    实现的主要方式对应 getTask() 方法的如下片段:

    java
     // Check if queue empty only if necessary.
     // 判断线程池状态和任务队列(主要)
     if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
         decrementWorkerCount();
         return null;
     }
    • 当线程池状态为 SHUTDOWN 时,只有当任务队列为空时,才会不再执行。

      表达式对应 :true && (false || workQueue.isEmpty()),这里可以看出只有当 workQueue.isEmpty() 成立时才会将置空线程,即任务队列为空时才会停止任务执行。

    • 当线程状态为 STOP 时,不论任务队列是否为空,都不再执行。(对应 shutdownNow 方法实现)

      表达式对应 :true && (true || workQueue.isEmpty()),这里可以看出不管任务队列是否为空,线程任务都不再执行。 ​

shutdownNow

调用该方法后,线程池拒绝接受新提交的任务,不再执行任务队列的任务,将线程池任务队列里的任务全部返回。

java
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          	//检查权限
            checkShutdownAccess();
          	//设置线程池的状态
            advanceRunState(STOP);
          	//中断线程
            interruptWorkers();
          	//设置任务队列
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
		
		//返回任务队列
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

过程分析

  1. 加锁。

  2. 检查是否有可执行的权限。

  3. 设置线程池状态为 STOP(类比 shutdown 对应方法)。

  4. 中断线程。

    中断所有运行线程。和 shutdown 的 方法相比,该方法没有尝试获取线程内部锁,而该锁限制的是不中断正在执行任务的线程。

    所以 shutdownNow 执行后,会将执行任务的线程一并中断。

    java
        private void interruptWorkers() {
            final ReentrantLock mainLock = this.mainLock;
          	//加锁(可冲入)
            mainLock.lock();
            try {
             		//中断所有运行线程
                for (Worker w : workers)
                  	//中断运行中的线程
                    w.interruptIfStarted();
            } finally {
                mainLock.unlock();
            }
        }
  5. 返回任务队列中的所有任务。

扩展分析

  1. 执行 shutdownNow 方法后再新增任务会发生什么?

    在调用 shutdownNow 方法后,线程池对新增的任务执行拒绝策略。(原理见 shutdown 扩展分析)

  2. 调用 shutdownNow 后,线程池不处理任务队列任务的机制?

    在调用 shutdownNow 方法后,线程池对新增的任务执行拒绝策略,同时不再执行任务队列里的任务。

    该机制的实现发生在线程池内任务执行的方法 runWordker() 里。

    实现的主要方式对应 getTask() 方法的如下片段:

    java
     // Check if queue empty only if necessary.
     // 判断线程池状态和任务队列(主要)
     if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
         decrementWorkerCount();
         return null;
     }
    • 当线程池状态为 SHUTDOWN 时,只有当任务队列为空时,才会不再执行。

      表达式对应 :true && (false || workQueue.isEmpty()),这里可以看出只有当 workQueue.isEmpty() 成立时才会将置空线程,即任务队列为空时才会停止任务执行。

    • 当线程状态为 STOP 时,不论任务队列是否为空,都不再执行。

      表达式对应 :true && (true || workQueue.isEmpty()),这里可以看出不管任务队列是否为空,线程任务都不再执行。

参考链接