侧边栏壁纸
博主头像
灬编程小子博主等级

写程序就是一个不断追求完美的过程

  • 累计撰写 4 篇文章
  • 累计创建 1 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

深入理解线程池

灬编程小子
2023-12-02 / 0 评论 / 1 点赞 / 97 阅读 / 24026 字

1. 为什么要引入线程池

在并发系统中,无限制的线程创建销毁,频繁的上下文切换带来的额外的资源消耗会引发资源耗尽的风险进而影响性能。

线程池的引入解决了资源管理的问题,线程池是池化技术的一种,通过池化预留资源,资源由池子统一管理,当系统需要线程来处理任务时,可以直接从池子获取资源,当任务处理完成后归还给池子,进而提高资源复用率。

在工作中我们难免会使用线程池,了解线程池的原理,能帮助我们更"安全"的使用它。

2. 线程池源码分析

首先来看下 JDK 提供的线程池框架的结构图

1-线程池框架结构.png

图中 Executor 为顶层接口,其中仅仅一行代码void execute(Runnable command); 下文分析。

ExecutorService 接口,在 Executor 接口的基础上添加了些方法。

AbstractExecutorService 抽象类为子类提供了些公用方法。

我们通常使用的线程池就是 ThreadPoolExecutor

下面我们来详细分析下线程池框架

2.1 Executor

Executor 接口仅仅一行代码,入参是一个线程,方法寓意提交一个任务,由此可见 Executor 的设计是将提交任务和后续执行任务拆开以解耦,这也是线程池的强大能力,你只管提交,剩下线程是如何被调度的,执行任务的过程全权交由线程池,你无需关注。

由于不需要获取结果,不会进行 FutureTask 的包装.

public interface Executor {
    void execute(Runnable command);
}

2.2 ExectorService

ExectorService 接口是对 Executor 接口的扩展,额外提供了很多其他函数,例如 submit(),shutDown()等,方法具体含义请看下述代码

public interface ExecutorService extends Executor {
<pre><code>// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务 状态流转RUNNING -&gt; SHUTDOWN
void shutdown();

  // 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务 它和shutdown方法相比,区别在于它会去停止当前正在进行的任务  状态流转(RUNNING or SHUTDOWN) -&gt; STOP
List&lt;Runnable&gt; shutdownNow();

  // 线程池是否已关闭
boolean isShutdown();

   // 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true 
boolean isTerminated();

   // 等待所有任务完成,并设置超时时间 实际应用中是,先调用 shutdown 或 shutdownNow, 然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时
boolean awaitTermination(long timeout, TimeUnit unit) 
        throws InterruptedException;

  // 提交一个 Callable 任务
&lt;T&gt; Future&lt;T&gt; submit(Callable&lt;T&gt; task); 
    
  // 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值,因为 Runnable 的 run 方法本身并不返回任何东西
&lt;T&gt; Future&lt;T&gt; submit(Runnable task, T result); 
  
  // 提交一个 Runnable 任务
Future&lt;?&gt; submit(Runnable task); 

  // 执行所有任务,返回 Future 类型的一个 list
&lt;T&gt; List&lt;Future&lt;T&gt;&gt; invokeAll(Collection&lt;? extends Callable&lt;T&gt;&gt; tasks) 
        throws InterruptedException;

  // 执行所有任务,返回 Future 类型的一个 list,有超时时间
&lt;T&gt; List&lt;Future&lt;T&gt;&gt; invokeAll(Collection&lt;? extends Callable&lt;T&gt;&gt; tasks,
                              long timeout, TimeUnit unit)
        throws InterruptedException; 

    // 只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果
&lt;T&gt; T invokeAny(Collection&lt;? extends Callable&lt;T&gt;&gt; tasks)
        throws InterruptedException, ExecutionException;

  // 只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果,超过指定的时间,抛出 TimeoutException 异常
&lt;T&gt; T invokeAny(Collection&lt;? extends Callable&lt;T&gt;&gt; tasks,
                long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException; 

}

2.3 AbstractExecutorService

AbstractExecutorService 抽象类派生自 ExecutorService 接口,提供了公用方法给子类使用

public abstract class AbstractExecutorService implements ExecutorService {</p>
<pre><code>/**
 * 将任务包装成 FutureTask 提交到线程池中执行
 */
protected &lt;T&gt; RunnableFuture&lt;T&gt; newTaskFor(Runnable runnable, T value) { 
    return new FutureTask&lt;T&gt;(runnable, value);
}

/**
 * 将任务包装成 FutureTask 提交到线程池中执行
 */
protected &lt;T&gt; RunnableFuture&lt;T&gt; newTaskFor(Callable&lt;T&gt; callable) {
    return new FutureTask&lt;T&gt;(callable);
}

/**
 * 提交任务
 */
public Future&lt;?&gt; submit(Runnable task) {  
    if (task == null) throw new NullPointerException();
      // 将任务包装成 FutureTask
    RunnableFuture&lt;Void&gt; ftask = newTaskFor(task, null); 
    // 交给执行器执行,execute 方法由具体的子类来实现
    execute(ftask); 
    return ftask;
}

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public &lt;T&gt; Future&lt;T&gt; submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture&lt;T&gt; ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public &lt;T&gt; Future&lt;T&gt; submit(Callable&lt;T&gt; task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture&lt;T&gt; ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
    
  // 本文中 invokeAny 和 invokeAll 方法并非重点,这里省略

}

我们来看 submit 方法,方法重载,支持参数是 Runnable 或者 Callable,该方法定义为提交一个任务

,任务的定义在实现 Runnable 或者 Callable 接口的 run()或者 call()中。Callable 和 Runnable 的区别在于 run() 没有返回值,而 Callable 的 call() 方法有返回值,同时,如果运行出现异常,call() 方法会抛出异常。

具体的提交逻辑,是将任务包装成为一个 FutureTask 后调用子类的 execute 方法,也就是该方法由 ThreadPoolExecutor 具体实现。

  /**
* 提交任务
*/
public Future<?> submit(Runnable task) {<br />
if (task == null) throw new NullPointerException();
// 将任务包装成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 交给执行器执行,execute 方法由具体的子类来实现
execute(ftask);
return ftask;
}

2-线程类.png

FutureTask 通过 RunnableFuture 间接实现了 Runnable 接口,所以每个 Runnable 通常都先包装成 FutureTask,然后调用 executor.execute(Runnable command) 将其提交给线程池

总结来说需要获取结果(FutureTask),用 submit 方法,不需要获取结果,可以用 execute 方法。

2.4 ThreadPoolExecutor

ThreadPoolExecutor 是 JDK 中的线程池实现,它实现了任务提交、线程管理、监控等等方法。

ThreadPoolExecutor 的创建通过构造函数

  public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

其中简单看一下构造函数中各参数的含义

corePoolSize线程池核心线程大小

是线程池中的一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut,简单来说线程池分为两个部分,核心线程池和非核心线程池,核心线程池中的线程一旦创建便不会被销毁,非核心线程池中的线程在创建后如果长时间没有被使用则会被销毁。

maximumPoolSize线程池最大线程数量

整个线程池的大小,此值大于等于1。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。工作队列满,且线程数等于最大线程数,此时再提交任务则会调用拒绝策略。maximumPoolSize - corePoolSize = 非核心线程池的大小

keepAliveTime:多余的空闲线程存活时间

非核心线程池中的线程在 keepAliveTime 时间内没有被使用就会被销毁,时间单位由 TimeUnit unit 决定。

当线程空闲时间达到 keepAliveTime 值时,多余的线程会被销毁直到只剩下 corePoolSize 个线程为止。

TimeUnit unit:空闲线程存活时间单位

keepAliveTime的计量单位

BlockingQueue<Runnable> workQueue:任务队列

阻塞队列用来存储任务,当有新的请求线程处理时,如果核心线程池已满,新来的任务会放入 workQueue 中,等待线程处理,JUC提供的阻塞队列有很多,例 ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue 等

ThreadFactory:工厂类对象

线程池的创建传入了此参数时,是通过工厂类中的 newThread()方法来实现。

RejectedExecutionHandler handler:拒绝策略

如果线程池中没有空闲线程,已存在 maximumPoolSize 个线程,且阻塞队列 workQueue 已满,这时再有新的任务请求线程池执行,会触发线程池的拒绝策略,可以通过参数 handler 来设置拒绝策略,注意只有有界队列例如 ArrayBlockingQueue 或者指定大小的 LinkedBlockingQueue 等拒绝策略才有用,因为无解队列拒绝策略永远不会被触发。

THreadPoolExector 预先定义好了一些拒绝策略。

 public static class CallerRunsPolicy implements RejectedExecutionHandler {</p>
<pre><code>    public CallerRunsPolicy() { }
            
           // 只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务。
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

public static class AbortPolicy implements RejectedExecutionHandler {

    public AbortPolicy() { }
   
          // 不管怎样,直接抛出 RejectedExecutionException 异常 默认的策略,如果我们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

    // 不做任何处理,直接忽略掉这个任务
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

    public DiscardOldestPolicy() { }

    // 如果线程池没有被关闭的话, 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}</code></pre><p style=""></p><p style="">接着我们来看下 TreadPoolExecutor 中核心的属性变量</p><p style="">TreadPoolExecutor 使用一个 32 位整数来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数。通过位运算计算,相比于基本运算,增强了计算速度。</p><pre><code class="language-java">// 用此变量保存当前池状态(高3位)和当前线程数(低29位)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// COUNT_BITS 设置为 29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数 private static final int COUNT_BITS = Integer.SIZE - 3;

// 000 11111111111111111111111111111 这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1=536870911 private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 将整数 c 的低 29 位修改为 0,就得到了线程池的状态 private static int runStateOf(int c) { return c & ~CAPACITY; }

// 将整数 c 的高 3 为修改为 0,就得到了线程池中的线程数 private static int workerCountOf(int c) { return c & CAPACITY; }

线程池中的各个状态也是通过位运算计算

// 线程池的状态存放在高 3 位中 运算结果为 111跟29个0:111 00000000000000000000000000000 .接受新的任务,处理等待队列中的任务<br />
private static final int RUNNING    = -1 << COUNT_BITS;</p>
<p>// 000 00000000000000000000000000000 .不接受新的任务提交,但是会继续处理等待队列中的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;</p>
<p>// 001 00000000000000000000000000000 .不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
private static final int STOP       =  1 << COUNT_BITS;</p>
<p>// 010 00000000000000000000000000000 .所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
private static final int TIDYING    =  2 << COUNT_BITS;</p>
<p>// 011 00000000000000000000000000000 .terminated() 方法结束后,线程池的状态就会变成这个
private static final int TERMINATED =  3 << COUNT_BITS;

底层类型存储可以看这篇回忆类型底层存储

位运算可以看这篇回忆位运算

我们可以通过以下代码(输出 32 位二进制数值)来验证

3-ctl验证.png

我们来看 ThreadPoolExecutor 的 execute 方法

   public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 线程池状态和线程数的整数
int c = ctl.get();
// 如果当前线程数小于核心线程数,创建 Worker 线程并启动线程
if (workerCountOf(c) < corePoolSize) {
// 添加任务成功,那么就结束了 结果会包装到 FutureTask 中
if (addWorker(command, true))
return;
c = ctl.get();
}
// 要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 ,如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// 二次状态检查
int recheck = ctl.get();
// 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池还是 RUNNING 的,并且线程数为 0,重新创建一个新的线程 这里目的担心任务提交到队列中了,但是线程都关闭了
else if (workerCountOf(recheck) == 0)
// 创建Worker,并启动里面的Thread,为什么传null,线程启动后会自动从阻塞队列拉任务执行
addWorker(null, false);
}
// 如果 workQueue 队列满了,那么进入到这个分支 以 maximumPoolSize 为界创建新的 worker 线程并启动,如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);

简单总结下上述代码的逻辑

  • 当前线程数小于核心线程数,则创建一个新的线程来执行任务

  • 当前线程数大于等于核心线程数,且阻塞队列未满,则将任务添加到队列中

  • 如果阻塞队列已满,当前线程数大于等于核心线程数,当前线程数小于最大线程数,则创建并启动一个线程来执行新提交的任务(这里可以继续看下面的分析)

  • 若当前线程数大于等于最大线程数,且阻塞队列已满,此时会执行拒绝策略

二次检查是为了应对并发情况,从上次判断线程池状态到现在线程池可能会被关闭,由于线程池关闭后不能再继续添加任务了,此时就需要回滚刚才的添加任务到队列中的操作,并执行拒绝策略

引用美团线程池篇图 执行流程如下图所示

4-线程池执行顺序.png

来看线程 Worker 作为线程池中真正执行任务的线程,继承了抽象类 AbstractQueuedSynchronizer,用 AQS 来实现独占锁,为的就是实现不可重入的特性去反应线程现在的执行状态

 private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{</p>
<pre><code>    private static final long serialVersionUID = 6138294804551838833L;

    // 这个是真正的线程
    final Thread thread;

    // 这里的 Runnable 是任务 这个线程起来以后需要执行的第一个任务,那么第一个任务就是存放在这里的(线程可不止执行这一个任务)
    Runnable firstTask;
    
    // 用于存放此线程完成的任务数,注意了,这里用了 volatile,保证可见性
    volatile long completedTasks; 

    // Worker 只有这一个构造方法,传入 firstTask,也可以传 null
    Worker(Runnable firstTask) { 
        setState(-1); 
        this.firstTask = firstTask;
          // 调用 ThreadFactory 来创建一个新的线程
        this.thread = getThreadFactory().newThread(this); 
    }

 
    public void run() {
        runWorker(this);
    }

}

ThreadPoolExecutor 的 execute 方法中创建 Worker 就是调用了下面的 addWorker方法,该方法 2 个入参。

第一个参数是准备提交给这个线程执行的任务,当为 null 时,线程启动后会自动从阻塞队列拉任务执行。

引用美团线程池篇图 执行流程如下图所示

5-addWorker执行顺序.png

第二个参数为 true 代表使用核心线程数作为创建线程的界限,也就说创建这个线程的时候,如果线程池中的线程总数已经达到核心线程数,那么不能响应这次创建线程的请求 如果是 false,代表使用最大线程数 作为界限同理。

     // 这个是真正的线程
final Thread thread;</p>
<pre><code>  private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
                    
           // 当线程池处于 SHUTDOWN 的时候,不允许提交任务,但是已有的任务继续执行
        if (rs &gt;= SHUTDOWN &amp;&amp;
            ! (rs == SHUTDOWN &amp;&amp;
               firstTask == null &amp;&amp;
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc &gt;= CAPACITY ||
                wc &gt;= (core ? corePoolSize : maximumPoolSize))
                return false;
              // 如果成功,那么就是所有创建线程前的条件校验都满足了,准备创建线程执行任务了 这里失败的话,说明有其他线程也在尝试往线程池中创建线程 重试 continue retry;
            if (compareAndIncrementWorkerCount(c))
                break retry;
              // 由于有并发,重新再读取一下 ctl
            c = ctl.get(); 
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
            
    // worker 是否已经启动
    boolean workerStarted = false; 
    // 是否已将这个 worker 添加到 workers 这个 HashSet 中
    boolean workerAdded = false; 
    Worker w = null;
    // 可以开始创建线程来执行任务了
    try { 
          // 把 firstTask 传给 worker 的构造方法
        w = new Worker(firstTask); 
          // 取 worker 中的线程对象,之前说了,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程
        final Thread t = w.thread; 
        if (t != null) {
               // 整个线程池的全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                                    // 小于 SHUTTDOWN 那就是 RUNNING
                if (rs &lt; SHUTDOWN || 
                    // 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务
                    (rs == SHUTDOWN &amp;&amp; firstTask == null)) { 
                      // worker 里面的 thread 可不能是已经启动的
                    if (t.isAlive())  
                        throw new IllegalThreadStateException();
                    // 加到 workers 这个 HashSet 中
                    workers.add(w); 
                    int s = workers.size();
                    // largestPoolSize 用于记录 workers 中的个数的最大值
                    if (s &gt; largestPoolSize) 
                        // 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
                        largestPoolSize = s; 
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加成功的话,启动这个线程
            if (workerAdded) { 
                // 启动线程
                t.start(); 
                workerStarted = true;
            }
        }
    } finally {
        // 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉
        if (! workerStarted) 
            addWorkerFailed(w);
    }
    // 返回线程是否启动成功
    return workerStarted; 
}

引用美团线程池篇图 执行流程如下图所示

6-addWorker执行流程.png

Worker 线程真正的执行逻辑为 runWorker 方法实现如下

// 此方法由 worker 线程启动后调用,这里用一个 while 循环来不断地从等待队列中获取任务并执行<br />
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// worker 在初始化的时候,可以指定 firstTask,那么第一个任务也就可以不需要从队列中获取
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环调用 getTask 获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池状态大于等于 STOP,那么意味着该线程也要中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 这是一个钩子方法,留给需要的子类实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 到这里终于可以执行任务了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 这里不允许抛出 Throwable,所以转换为 Error
thrown = x; throw new Error(x);
} finally {
// 钩子方法,将 task 和异常作为参数,留给需要的子类实现
afterExecute(task, thrown);
}
} finally {
// 置空 task,准备 getTask 获取下一个任务
task = null;
// 累加完成的任务数
w.completedTasks++;
// 释放掉 worker 的独占锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 获取不到任务时,主动回收自己 执行线程关闭 可能getTask 返回 null,也就是说,队列中已经没有任务需要执行了,执行关闭,或者任务执行过程中发生了异常.
processWorkerExit(w, completedAbruptly);
}

执行流程如下图所示

7-runWorker执行流程.png

简单来说 Worker 线程启动后调用,会通过 while 循环来不断地通过 getTask 方法从等待队列中获取任务并执行达到线程回收。

getTask 的实现也比较简单,阻塞直到获取到任务返回,keepAliveTime 超时退出。

   private Runnable getTask() {
boolean timedOut = false;</p>
<pre><code>    for (;;) {
        int c = ctl.get();
        // 获取线程池的状态
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs &gt;= SHUTDOWN &amp;&amp; (rs &gt;= STOP || workQueue.isEmpty())) {
            // CAS 操作,减少工作线程数
            decrementWorkerCount(); 
            return null;
        }
                    
          // 获取线程池中的线程数
        int wc = workerCountOf(c);

        // 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭
        boolean timed = allowCoreThreadTimeOut || wc &gt; corePoolSize;

        if ((wc &gt; maximumPoolSize || (timed &amp;&amp; timedOut))
            &amp;&amp; (wc &gt; 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try { 
            // 到 workQueue 中获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
           // 如果此 worker 发生了中断,采取的方案是重试
           // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量,那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null
            timedOut = false; 
        }
    }
</code></pre><p style="">需要注意的是</p><ul><li><p style="">线程池处于 SHUTDOWN,而且 workQueue 是空的,该方法返回 null,这种不再接受新的任务。</p></li><li><p style="">线程池中有大于 maximumPoolSize 个 workers 存在,这种可能是因为有可能开发者调用了 setMaximumPoolSize() 将线程池的 maximumPoolSize 调小了,那么多余的 Worker 就需要被关闭</p></li><li><p style="">线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行</p></li><li><p style="">如果此 worker 发生了中断,采取的方案是重试,也就是说如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量,那么意味着超出的部分线程要被关闭。重新进入 for 循环获取任务</p></li></ul><pre><code class="language-java">    public void setMaximumPoolSize(int maximumPoolSize) {
    if (maximumPoolSize &lt;= 0 || maximumPoolSize &lt; corePoolSize)
        throw new IllegalArgumentException();
    this.maximumPoolSize = maximumPoolSize;
    if (workerCountOf(ctl.get()) &gt; maximumPoolSize)
        // 中断 worker 重试 超出的部分线程要被关闭
        interruptIdleWorkers();
}

private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }

引用美团线程池篇图 执行流程如下图所示

8-interruptIdleWorkers流程.png

至此本篇结束

1

评论区