0
点赞
收藏
分享

微信扫一扫

并发编程——ThreadPool原理


摘要

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

一、线程池的作用

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

  1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:

  1. 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
  2. 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
  3. 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

  • 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

二、线程池的基本使用

线程池的使用和创建可以说非常的简单,这得益于JDK提供给我们良好封装的API。线程池的实现被封装到了ThreadPoolExecutor中,我们可以通过ThreadPoolExecutor的构造方法来实例化出一个线程池,代码如下:

// 实例化一个线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));

// 使用线程池执行一个任务
executor.execute(() -> {
// Do something
});

// 关闭线程池,会阻止新任务提交,但不影响已提交的任务
executor.shutdown();

// 关闭线程池,阻止新任务提交,并且中断当前正在运行的线程
executor.showdownNow();

创建好线程池后直接调用execute方法并传入一个Runnable参数即可将任务交给线程池执行,通过shutdown/shutdownNow方法可以关闭线程池。

ThreadPoolExecutor的构造方法中参数众多,对于初学者而言在没有了解各个参数的作用的情况下很难去配置合适的线程池。因此Java还为我们提供了一个线程池工具类Executors来快捷的创建线程池。Executors提供了很多简便的创建线程池的方法,举两个例子,代码如下:

// 实例化一个单线程的线程池
ExecutorService singleExecutor = Executors.newSingleThreadExecutor();

// 创建固定线程个数的线程池
ExecutorService fixedExecutor = Executors.newFixedThreadPool(10);

// 创建一个可重用固定线程数的线程池
ExecutorService executorService2 = Executors.newCachedThreadPool();

但是,通常来说在实际开发中并不推荐直接使用Executors来创建线程池,而是需要根据项目实际情况配置适合自己项目的线程池。

三、线程池的生命周期

线程池从诞生到死亡,中间会经历RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五个生命周期状态。

  • RUNNING 表示线程池处于运行状态,能够接受新提交的任务且能对已添加的任务进行处理。RUNNING状态是线程池的初始化状态,线程池一旦被创建就处于RUNNING状态。
  • SHUTDOWN 线程处于关闭状态,不接受新任务,但可以处理已添加的任务。RUNNING状态的线程池调用shutdown后会进入SHUTDOWN状态。
  • STOP 线程池处于停止状态,不接收任务,不处理已添加的任务,且会中断正在执行任务的线程。RUNNING状态的线程池调用了shutdownNow后会进入STOP状态。
  • TIDYING 当所有任务已终止,且任务数量为0时,线程池会进入TIDYING。当线程池处于SHUTDOWN状态时,阻塞队列中的任务被执行完了,且线程池中没有正在执行的任务了,状态会由SHUTDOWN变为TIDYING。当线程处于STOP状态时,线程池中没有正在执行的任务时则会由STOP变为TIDYING。
  • TERMINATED 线程终止状态。处于TIDYING状态的线程执行terminated()后进入TERMINATED状态。

根据上述线程池生命周期状态的描述,可以画出如下所示的线程池生命周期状态流程示意图。

并发编程——ThreadPool原理_阻塞队列

四、线程池的核心参数

使用ThreadPoolExecutor的构造方法来创建了一个线程池。其实在ThreadPoolExecutor中有多个构造方法,但是最终都调用到了下边代码中的这一个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ...省略校验相关代码

this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

// ...

}

这个构造方法中有7个参数之多,我们逐个来看每个参数所代表的含义:


corePoolSize:表示线程池的核心线程数。当有任务提交到线程池时,如果线程池中的线程数小于corePoolSize,那么则直接创建新的线程来执行任务。


workQueue:任务队列,它是一个阻塞队列,用于存储来不及执行的任务的队列。当有任务提交到线程池的时候,如果线程池中的线程数大于等于corePoolSize,那么这个任务则会先被放到这个队列中,等待执行。


maximumPoolSize:表示线程池支持的最大线程数量。当一个任务提交到线程池时,线程池中的线程数大于corePoolSize,并且workQueue已满,那么则会创建新的线程执行任务,但是线程数要小于等于maximumPoolSize。


keepAliveTime:核心线程数默认是没有超时时间,非核心线程数有(超时时间:若工作线程数大于核心线程数,且从等待队列中超时获取,则把当前线程移除)


unit:非核心线程空闲时保持存活的时间的单位


threadFactory:创建线程的工厂,可以在这里统一处理创建线程的属性


handler:拒绝策略,当线程池中的线程达到maximumPoolSize线程数后且workQueue已满的情况下,再向线程池提交任务则执行对应的拒绝策略

五、线程池工作流程

从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下。

  • 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
  • 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
  • 线程池判断线程池的线程是否都处工作状态。如果没有,则创建新的工作线程来执行任务。如果已经满了,交给饱和策略来处理这个任务。ThreadPool执行execute()方法示意图。

并发编程——ThreadPool原理_java_02

并发编程——ThreadPool原理_线程池_03

 任务缓冲:任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

并发编程——ThreadPool原理_工作线程_04

使用不同的队列可以实现不一样的任务存取策略。在这里,我们可以再介绍下阻塞队列的成员:

并发编程——ThreadPool原理_工作线程_05

任务申请:由上文的任务分配部分可知,任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,其执行流程如下图所示:

并发编程——ThreadPool原理_阻塞队列_06

getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。

六、线程池的拒绝策略

如果线程池中的线程数达到了maximumPoolSize,并且workQueue队列存储满的情况下,线程池会执行对应的拒绝策略。在JDK中提供了RejectedExecutionHandler接口来执行拒绝操作。实现RejectedExecutionHandler的类有四个,对应了四种拒绝策略。分别如下:

  • DiscardPolicy:当提交任务到线程池中被拒绝时,线程池会丢弃这个被拒绝的任务
  • DiscardOldestPolicy:当提交任务到线程池中被拒绝时,线程池会丢弃等待队列中最老的任务。
  • CallerRunsPolicy:当提交任务到线程池中被拒绝时,会在线程池当前正在运行的Thread线程中处理被拒绝额任务。即哪个线程提交的任务哪个线程去执行。
  • AbortPolicy:当提交任务到线程池中被拒绝时,直接抛出RejectedExecutionException异常。

七、线程池的线程数配置

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
任务的性质:

  • CPU密集型任务
  • IO密集型任务和混合型任务。

性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置N:cpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*N cpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。

注意:如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务。

八、线程池的源码分析

8.1 线程池中的位运算

在向线程池提交任务时有两个比较中要的参数会决定任务的去向,这两个参数分别是线程池的状态和线程池中的线程数。在ThreadPoolExecutor内部使用了一个AtomicInteger类型的整数ctl来表示这两个参数,代码如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
// Integer.SIZE = 32.所以 COUNT_BITS= 29
private static final int COUNT_BITS = Integer.SIZE - 3;

// 00011111 11111111 11111111 11111111 这个值可以表示线程池的最大线程容量
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// 将-1左移29位得到RUNNING状态的值
private static final int RUNNING = -1 << COUNT_BITS;

// 线程池运行状态和线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int ctlOf(int rs, int wc) { return rs | wc; }

// ...
}

因为涉及多线程的操作,这里为了保证原子性,ctl参数使用了AtomicInteger类型,并且通过ctlOf方法来计算出了ctl的初始值。如果你不了解位运算大概很难理解上述代码的用意。

int类型在Java中占用4byte的内存,一个byte占用8bit,所以Java中的int类型共占用32bit。对于这个32bit,我们可以进行高低位的拆分。做Android开发的同学应该都了解View测量流程中的MeasureSpec参数,这个参数将32bit的int拆分成了高2位和低30位,分别表示View的测量模式和测量值。而这里的ctl与MeasureSpec类似,ctl将32位的int拆分成了高3位和低29位,分别表示线程池的运行状态和线程池中的线程个数。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

​ctl​​这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

8.2 ThreadPoolExecutor的execute

向线程池提交任务的方法是execute方法,execute方法是ThreadPoolExecutor的核心方法,以此方法为入口来进行剖析,execute方法的代码如下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取ctl的值
int c = ctl.get();
// 1.线程数小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 线程池中线程数小于核心线程数,则尝试创建核心线程执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.到此处说明线程池中线程数大于核心线程数或者创建线程失败
if (isRunning(c) && workQueue.offer(command)) {
// 如果线程是运行状态并且可以使用offer将任务加入阻塞队列未满,
// offer是非阻塞操作。
int recheck = ctl.get();
// 重新检查线程池状态,因为上次检测后线程池状态可能发生改变,
// 如果非运行状态就移除任务并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果是运行状态,并且线程数是0,则创建线程
else if (workerCountOf(recheck) == 0)
// 线程数是0,则创建非核心线程,且不指定首次执行任务,这里的第二个参数其实没有实际意义
addWorker(null, false);
}
// 3.阻塞队列已满,创建非核心线程执行任务
else if (!addWorker(command, false))
// 如果失败,则执行拒绝策略
reject(command);
}

execute方法中的逻辑可以分为三部分:

  • 1.如果线程池中的线程数小于核心线程,则直接调用addWorker方法创建新线程来执行任务。
  • 2.如果线程池中的线程数大于核心线程数,则将任务添加到阻塞队列中,接着再次检验线程池的运行状态,因为上次检测过之后线程池状态有可能发生了变化,如果线程池关闭了,那么移除任务,执行拒绝策略。如果线程依然是运行状态,但是线程池中没有线程,那么就调用addWorker方法创建线程,注意此时传入任务参数是null,即不指定执行任务,因为任务已经加入了阻塞队列。创建完线程后从阻塞队列中取出任务执行。
  • 3.如果第2步将任务添加到阻塞队列失败了,说明阻塞队列任务已满,那么则会执行第三步,即创建非核心线程来执行任务,如果非核心线程创建失败那么就执行拒绝策略。

接下来看下execute方法中创建线程的方法addWoker,addWoker方法承担了核心线程和非核心线程的创建,通过一个boolean参数core来区分是创建核心线程还是非核心线程。先来看addWorker方法前半部分的代码:

// 返回值表示是否成功创建了线程
private boolean addWorker(Runnable firstTask, boolean core) {
// 这里做了一个retry标记,相当于goto.
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
// 根据core来确定创建最大线程数,超过最大值则创建线程失败,
// 注意这里的最大值可能有三个corePoolSize、maximumPoolSize和线程池线程的最大容量
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 通过CAS来将线程数+1,如果成功则跳出循环,执行下边逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池的状态发生了改变,退回retry重新执行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}

// ...省略后半部分

return workerStarted;
}

这部分代码会通过是否创建核心线程来确定线程池中线程数的值,如果是创建核心线程,那么最大值不能超过corePoolSize,如果是创建非核心线程那么线程数不能超过maximumPoolSize,另外无论是创建核心线程还是非核心线程,最大线程数都不能超过线程池允许的最大线程数COUNT_MASK(有可能设置的maximumPoolSize大于COUNT_MASK)。如果线程数大于最大值就返回false,创建线程失败。

接下来通过CAS将线程数加1,如果成功那么就break retry结束无限循环,如果CAS失败了则就continue retry从新开始for循环,注意这里的retry不是Java的关键字,是一个可以任意命名的字符。

接下来,如果能继续向下执行则开始执行创建线程并执行任务的工作了,看下addWorker方法的后半部分代码:

private boolean addWorker(Runnable firstTask, boolean core) {

// ...省略前半部分

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 实例化一个Worker,内部封装了线程
w = new Worker(firstTask);
// 取出新建的线程
final Thread t = w.thread;
if (t != null) {
// 这里使用ReentrantLock加锁保证线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
// 拿到锁湖重新检查线程池状态,只有处于RUNNING状态或者
// 处于SHUTDOWN并且firstTask==null时候才会创建线程
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 线程不是处于NEW状态,说明线程已经启动,抛出异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将线程加入线程队列,这里的worker是一个HashSet
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 开启线程执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

这部分逻辑其实比较容易理解,就是创建Worker并开启线程执行任务的过程,Worker是对线程的封装,创建的worker会被添加到ThreadPoolExecutor中的HashSet中。也就是线程池中的线程都维护在这个名为workers的HashSet中并被ThreadPoolExecutor所管理,HashSet中的线程可能处于正在工作的状态,也可能处于空闲状态,一旦达到指定的空闲时间,则会根据条件进行回收线程。

我们知道,线程调用start后就会开始执行线程的逻辑代码,执行完后线程的生命周期就结束了,那么线程池是如何保证Worker执行完任务后仍然不结束的呢?当线程空闲超时或者关闭线程池又是怎样进行线程回收的呢?这个实现逻辑其实就在Worker中。看下Worker的代码:

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 执行任务的线程
final Thread thread;
// 初始化Worker时传进来的任务,可能为null,如果不空,
// 则创建和立即执行这个task,对应核心线程创建的情况
Runnable firstTask;

Worker(Runnable firstTask) {
// 初始化时设置setate为-1
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 通过线程工程创建线程
this.thread = getThreadFactory().newThread(this);
}

// 线程的真正执行逻辑
public void run() {
runWorker(this);
}

// 判断线程是否是独占状态,如果不是意味着线程处于空闲状态
protected boolean isHeldExclusively() {
return getState() != 0;
}

// 获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// ...
}

Worker是位于ThreadPoolExecutor中的一个内部类,它继承了AQS,使用AQS来实现了独占锁的功能,但是并没支持可重入。这里使用不可重入的特性来表示线程的执行状态,即可以通过isHeldExclusively方法来判断,如果是独占状态,说明线程正在执行任务,如果非独占状态,说明线程处于空闲状态。

另外,Worker还实现了Runnable接口,因此它的执行逻辑就是在run方法中,run方法调用的是线程池中的runWorker(this)方法。任务的执行逻辑就在runWorker方法中,它的代码如下:

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 取出Worker中的任务,可能为空
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task不为null或者阻塞队列中有任务,通过循环不断的从阻塞队列中取出任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
// ...
try {
// 任务执行前的hook点
beforeExecute(wt, task);
try {
// 执行任务
task.run();
// 任务执行后的hook点
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 超时没有取到任务,则回收空闲超时的线程
processWorkerExit(w, completedAbruptly);
}
}

可以看到,runWorker的核心逻辑就是不断通过getTask方法从阻塞队列中获取任务并执行.通过这样的方式实现了线程的复用,避免了创建线程。这里要注意的是这里是一个“生产者-消费者”模式,getTask是从阻塞队列中取任务,所以如果阻塞队列中没有任务的时候就会处于阻塞状态。getTask中通过判断是否要回收线程而设置了等待超时时间,如果阻塞队列中一直没有任务,那么在等待keepAliveTime时间后会返回一个null。最终会走到上述代码的finally方法中,意味着有线程空闲时间超过了keepAliveTime时间,那么调用processWorkerExit方法移除Worker。

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
// ...


// Flag1. 如果配置了allowCoreThreadTimeOut==true或者线程池中的
// 线程数大于核心线程数,则timed为true,表示开启指定线程超时后被回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// ...

try {
// Flag2. 取出阻塞队列中的任务,注意如果timed为true,则会调用阻塞队列的poll方法,
// 并设置超时时间为keepAliveTime,如果超时没有取到任务则会返回null。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

重点看getTask是如何处理空闲超时的逻辑的。我们知道,回收线程的条件是线程大于核心线程数或者配置了allowCoreThreadTimeOut为true,当线程空闲超时的情况下就会回收线程。上述代码在Flag1处先判断了如果线程池中的线程数大于核心线程数,或者开启了allowCoreThreadTimeOut,那么就需要开启线程空闲超时回收。所有在Flag2处,timed为true的情况下调用了阻塞队列的poll方法,并传入了超时时间为keepAliveTime,poll方法是一个阻塞方法,在没有任务时候回进行阻塞。如果在keepAliveTime时间内,没有获取到任务,那么poll方法就会返回null,结束runWorker的循环。进而执行runWorker方法中回收线程的操作。

这里需要我们理解阻塞队列poll方法的使用,poll方法接受一个时间参数,是一个阻塞操作,在给定的时间内没有获取到数据就返回null。poll方法的核心代码如下:

while (count == 0) { 
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}

8.3 ThreadPoolExecutor的拒绝策略

上一小节中我们多次提到线程池的拒绝策略,它是在reject方法中实现的。实现代码也非常简单,代码如下:

final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

通过调用handler的rejectedExecution方法实现。这里其实就是运用了策略模式,handler是一个RejectedExecutionHandler类型的成员变量,RejectedExecutionHandler是一个接口,只有一个rejectedExecution方法。在实例化线程池时构造方法中传入对应的拒绝策略实例即可。前文已经提到了Java提供的几种默认实现分别为DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy以及AbortPolicy。

public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

8.4 ThreadPoolExecutor的shutdown

调用shutdown方法后,会将线程池标记为SHUTDOWN状态,上边execute的源码可以看出,只有线程池是RUNNING状态才接受任务,因此被标记位SHUTDOWN后,再提交任务会被线程池拒绝。shutdown的代码如下:

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否可以关闭线程
checkShutdownAccess();
// 将线程池状态置为SHUTDOWN状态
advanceRunState(SHUTDOWN);
// 尝试中断空闲线程
interruptIdleWorkers();
// 空方法,线程池关闭的hook点
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}

private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

修改线程池为SHUTDOWN状态后,会调用interruptIdleWorkers去中断空闲线程线程,具体实现逻辑是在interruptIdleWorkers(boolean onlyOne)方法中,如下:

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 尝试tryLock获取锁,如果拿锁成功说明线程是空闲状态
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdown的逻辑比较简单,里边做了两件比较重要的事情,即先将线程池状态修改为SHUTDOWN,接着遍历所有Worker,将空闲的Worker进行中断。

九、线程池在业务中的实践

在当今的互联网业界,为了最大程度利用CPU的多核性能,并行运算的能力是不可或缺的。通过线程池管理线程获取并发性是一个非常基础的操作,让我们来看两个典型的使用线程池获取并发性的场景。

场景1:快速响应用户请求

描述:用户发起的实时请求,服务追求响应时间。比如说用户要查看一个商品的信息,那么我们需要将商品维度的一系列信息如商品的价格、优惠、库存、图片等等聚合起来,展示给用户。

分析:从用户体验角度看,这个结果响应的越快越好,如果一个页面半天都刷不出,用户可能就放弃查看这个商品了。而面向用户的功能聚合通常非常复杂,伴随着调用与调用之间的级联、多级级联等情况,业务开发同学往往会选择使用线程池这种简单的方式,将调用封装成任务并行的执行,缩短总体响应时间。另外,使用线程池也是有考量的,这种场景最重要的就是获取最大的响应速度去满足用户,所以应该不设置队列去缓冲并发任务,调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。

并发编程——ThreadPool原理_工作线程_07

场景2:快速处理批量任务

描述:离线的大量计算任务,需要快速执行。比如说,统计某个报表,需要计算出全国各个门店中有哪些商品有某种属性,用于后续营销策略的分析,那么我们需要查询全国所有门店中的所有商品,并且记录具有某属性的商品,然后快速生成报表。

分析:这种场景需要执行大量的任务,我们也会希望任务执行的越快越好。这种情况下,也应该使用多线程策略,并行计算。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。

并发编程——ThreadPool原理_工作线程_08

十、实际问题及方案思考

线程池使用面临的核心的问题在于:线程池的参数并不好配置。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大,这导致业界并没有一些成熟的经验策略帮助开发人员参考。

关于线程池配置不合理引发的故障,公司内部有较多记录,下面举一些例子:

Case1:2018年XX页面展示接口大量调用降级:

事故描述:XX页面展示接口产生大量调用降级,数量级在几十到上百。

事故原因:该服务展示接口内部逻辑使用线程池做并行计算,由于没有预估好调用的流量,导致最大核心数设置偏小,大量抛出RejectedExecutionException,触发接口降级条件,示意图如下:

并发编程——ThreadPool原理_线程池_09

Case2:2018年XX业务服务不可用S2级故障

事故描述:XX业务提供的服务执行时间过长,作为上游服务整体超时,大量下游服务调用失败。

事故原因:该服务处理请求内部逻辑使用线程池做资源隔离,由于队列设置过长,最大线程数设置失效,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致下游服务的大量调用超时失败。示意图如下:

并发编程——ThreadPool原理_线程池_10

1. 能否不用线程池?

回到最初的问题,业务使用线程池是为了获取并发性,对于获取并发性,是否可以有什么其他的方案呢替代?我们尝试进行了一些其他方案的调研:

并发编程——ThreadPool原理_阻塞队列_11

 综合考虑,这些新的方案都能在某种情况下提升并行任务的性能,然而本次重点解决的问题是如何更简易、更安全地获得的并发性。另外,Actor模型的应用实际上甚少,只在Scala中使用广泛,协程框架在Java中维护的也不成熟。这三者现阶段都不是足够的易用,也并不能解决业务上现阶段的问题。

2. 追求参数设置合理性?

有没有一种计算公式,能够让开发同学很简易地计算出某种场景中的线程池应该是什么参数呢?

带着这样的疑问,我们调研了业界的一些线程池参数配置方案:

并发编程——ThreadPool原理_线程池_12

调研了以上业界方案后,我们并没有得出通用的线程池计算方式。并发任务的执行情况和任务类型相关,IO密集型和CPU密集型的任务运行起来的情况差异非常大,但这种占比是较难合理预估的,这导致很难有一个简单有效的通用公式帮我们直接计算出结果。

3. 线程池参数动态化?

尽管经过谨慎的评估,仍然不能够保证一次计算出来合适的参数,那么我们是否可以将修改线程池参数的成本降下来,这样至少可以发生故障的时候可以快速调整从而缩短故障恢复的时间呢?基于这个思考,我们是否可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:

并发编程——ThreadPool原理_线程池_13

10.1 动态化线程池

10.1.1 整体设计

动态化线程池的核心设计包括以下三个方面:

  1. 简化线程池配置:线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:(1)并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。(2)并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求,Less is More。
  2. 参数可动态修改:为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。
  3. 增加线程池监控:对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发同学了解线程池状态。

并发编程——ThreadPool原理_java_14

10.1.2 功能架构

动态化线程池提供如下功能:

  • 动态调参:支持线程池参数动态调整、界面化操作;包括修改线程池核心大小、最大核心大小、队列长度等;参数修改后及时生效。
  • 任务监控:支持应用粒度、线程池粒度、任务粒度的Transaction监控;可以看到线程池的任务执行情况、最大任务执行时间、平均任务执行时间、95/99线等。
  • 负载告警:线程池队列任务积压到一定值的时候会通过大象(美团内部通讯工具)告知应用开发负责人;当线程池负载数达到一定阈值的时候会通过大象告知应用开发负责人。
  • 操作监控:创建/修改和删除线程池都会通知到应用的开发负责人。
  • 操作日志:可以查看线程池参数的修改记录,谁在什么时候修改了线程池参数、修改前的参数值是什么。
  • 权限校验:只有应用开发负责人才能够修改应用的线程池参数。

并发编程——ThreadPool原理_阻塞队列_15

 参数动态化

JDK原生线程池ThreadPoolExecutor提供了如下几个public的setter方法,如下图所示:

并发编程——ThreadPool原理_阻塞队列_16

JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略,以setCorePoolSize为方法例,在运行期线程池使用方调用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idel的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务,setCorePoolSize具体流程如下:

并发编程——ThreadPool原理_java_17

线程池监控

除了参数动态化之外,为了更好地使用线程池,我们需要对线程池的运行状况有感知,比如当前线程池的负载是怎么样的?分配的资源够不够用?任务的执行情况是怎么样的?是长任务还是短任务?基于对这些问题的思考,动态化线程池提供了多个维度的监控和告警能力,包括:线程池活跃度、任务的执行Transaction(频率、耗时)、Reject异常、线程池内部统计信息等等,既能帮助用户从多个维度分析线程池的使用情况,又能在出现问题第一时间通知到用户,从而避免故障或加速故障恢复。

1. 负载监控和告警

线程池负载关注的核心问题是:基于当前线程池参数分配的资源够不够。对于这个问题,我们可以从事前和事中两个角度来看。事前,线程池定义了“活跃度”这个概念,来让用户在发生Reject异常之前能够感知线程池负载问题,线程池活跃度计算公式为:线程池活跃度 = activeCount/maximumPoolSize。这个公式代表当活跃线程数趋向于maximumPoolSize的时候,代表线程负载趋高。事中,也可以从两方面来看线程池的过载判定条件,一个是发生了Reject异常,一个是队列中有等待任务(支持定制阈值)。以上两种情况发生了都会触发告警,告警信息会通过大象推送给服务所关联的负责人。

并发编程——ThreadPool原理_线程池_18

2. 任务级精细化监控

在传统的线程池应用场景中,线程池中的任务执行情况对于用户来说是透明的。比如在一个具体的业务场景中,业务开发申请了一个线程池同时用于执行两种任务,一个是发消息任务、一个是发短信任务,这两类任务实际执行的频率和时长对于用户来说没有一个直观的感受,很可能这两类任务不适合共享一个线程池,但是由于用户无法感知,因此也无从优化。动态化线程池内部实现了任务级别的埋点,且允许为不同的业务任务指定具有业务含义的名称,线程池内部基于这个名称做Transaction打点,基于这个功能,用户可以看到线程池内部任务级别的执行情况,且区分业务,任务监控示意图如下图所示:

并发编程——ThreadPool原理_java_19

3. 运行时状态实时查看

用户基于JDK原生线程池ThreadPoolExecutor提供的几个public的getter方法,可以读取到当前线程池的运行状态以及参数,如下图所示:

 

并发编程——ThreadPool原理_阻塞队列_20

动态化线程池基于这几个接口封装了运行时状态实时查看的功能,用户基于这个功能可以了解线程池的实时状态,比如当前有多少个工作线程,执行了多少个任务,队列中等待的任务数等等。效果如下图所示:

并发编程——ThreadPool原理_工作线程_21

举报

相关推荐

0 条评论