Skip to content

线程池的执行

执行流程

image-20250528180300320

  1. 根据初始化参数创建线程池,刚创建时,线程池内没有线程
  2. 当有新的任务提交到线程池的时候,会立即新增线程执行任务。
  3. 运行线程数 = 核心线程数时,这时进来的任务会被添加到任务队列中,而线程会从任务队列中获取任务执行。
  4. 运行线程数 = 核心线程数任务队列已满,这时候会在线程池中创建新线程来执行任务。
  5. 运行线程数 = 最大线程数且任务队列已满,此时会执行线程池对应的拒绝策略
  6. 当任务队列中没有任务,且线程等待时间超过空闲时间,则该线程会被回收。最终线程池中的线程数量会保持在核心线程数的大小

源码分析

  1. 在调用线程池执行方法 execute(Runnable command) 中,有对应逻辑判断。

    java
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            
            int c = ctl.get();
          	//1.若当前运行线程数 < 核心线程数
            if (workerCountOf(c) < corePoolSize) {
              	//新建线程执行任务 
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
          	//2. 运行线程数 >= 核心线程数时;workQueue.offer(command) 将任务放入阻塞队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
          	//3. 任务队列已满,无法放入时。则新建线程执行任务(运行线程数 < 最大线程数)
            else if (!addWorker(command, false))
              	//4. 若任务队列已满。新建线程失败(运行线程数 = 最大线程数)。执行拒绝策略
                reject(command);
        }
  2. addWorker(Runnable firstTask, boolean core) 方法用来创建线程任务。前半部分代码用来检查线程状态和判断线程池内的线程数量。

    java
    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 1. 检查线程池状态
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    						
                for (;;) {
                    int wc = workerCountOf(c);
                  	//2. core 分别指定核心线程数和最大线程数。(若大于等于,则添加失败。)
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            ......
            //第二部分 创建线程任务  
        }
    • 判断线程池状态

      当线程池状态为 SHUTDOWN 时,不再创建线程任务。

    • 判断线程池内的线程数量

      execute(Runnable command) 对应。

      core=true 时,当线程数量大于等于核心线程数时,不允许新建线程任务。

      core=false 时,当线程池大于等于最大线程数时,不允许新建线程任务。

  3. addWorker(Runnable firstTask, boolean core) 方法用来创建线程任务,后半部分代码主要用来创建线程任务。

    • 创建 Worker 对象。
    • 实例化 Thread 对象。
    • 启动线程。
    java
    private boolean addWorker(Runnable firstTask, boolean core) {
            
      			//第一部分
      		  ......
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
              	//1.创建一个Worker对象
                w = new Worker(firstTask);
              	//2. 实例化线程对象
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                      	//3. 启动线程
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
  4. Worker 类。

    Worker 类是一个线程类,任务启动时调用了 runWorker(Worker w) 方法。

    java
        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
           
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
              	//新建线程
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
              	//调用 runWorker 方法
                runWorker(this);
            }
  5. runWorker(Worker w) 方法

    • 线程首次调用时,会指定任务,之后都需要从任务队列中取任务来执行。
    • getTask() 方法从任务队列取任务。
    • task.run() 线程执行。
    java
       final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
         		//线程首次创建时,会指定任务(之后都需要从任务队列取任务来执行)
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
              	//getTask() 从队列获取任务
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                          	//任务执行
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
  6. getTask 方法

    java
        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
              	// 检查线程池状态(SHUTDOWN:等待任务队列任务全部完成后线程退出;STOP:线程直接退出;)
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    						//工作线程数
                int wc = workerCountOf(c);
    
                // 判断线程是否回收(线程数量>核心线程数时,若线程空闲,则需要回收空闲线程,但是要控制线程数量不低于当前线程)
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                  	//获取任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }

    从任务队列获取任务时:

    • 若当前线程数 > 核心线程数;

      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)

      调用 poll 方法获取任务,并设置超时时间,若在指定时间未获取到任务,则设置超时。

    • 若当前线程 <= 核心线程数;

      调用 take 方法开始阻塞获取任务。(保证线程数量不低于核心线程数)

    java
     				// 当前线程数 > 核心线程数 => true
    				// 其它 => false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
    				try {
                  	//获取任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
             }

常见问题

线程池如何保证运行线程数量不低于核心线程数?

在线程回收的时候。若运行线程数量 < 核心线程数;

则会调用 take 方法阻塞从任务队列获取任务,直到任务队列有新增任务,才会继续执行,而不是直接销毁线程。

从任务队列获取任务时:

  • 若当前线程数 > 核心线程数;

    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)

    调用 poll 方法获取任务,并设置超时时间,若在指定时间未获取到任务,则设置超时。

  • 若当前线程 <= 核心线程数;

    调用 take 方法开始阻塞获取任务。(保证线程数量不低于核心线程数)

    take 等待队列中有数据进来,获取数据然后执行。

    java
    			// 当前线程数 > 核心线程数 => true
    			// 其它 => false
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
    
    			try {
              	//获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
         }

参考链接