0
点赞
收藏
分享

微信扫一扫

ThreadPoolExecutor源码解读(一)——重新认识ThreadPoolExecutor(核心参数、生命周期、位运算、ThreadFactory、拒接策略)


更多JUC源码解读系列文章请持续关注​​JUC源码解读文章目录JDK8​​!


文章目录


  • ​​一、前言​​
  • ​​二、提交任务涉及到的核心参数​​
  • ​​三、线程池的生命周期​​
  • ​​四、ThreadFactory如何创建工作线程​​
  • ​​五、四种官方拒接策略​​
  • ​​六、总结​​


一、前言

在Java中,创建一个线程​​new Thread​​,就像创建一个对象一样简单,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,并且要为线程分配一系列的资源,所以线程是一个重量级的对象,应该避免频繁创建和销毁。线程池的出现就是为了管理线程,让线程复用。

阿里巴巴java开发手册中有一条强制的编程规约:


【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样

的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。


既然推荐通过​​ThreadPoolExecutor​​的方式使用线程池,则有必要了解一些底层原理,如:线程池的几个核心参数,提交任务的过程,任务是如何在线程池中运行,线程是如何复用,线程池的状态流转等。了解了这些原理知识,不仅可以轻松应对面试,更重要的是在实际工作中更好、更正确的使用线程池;同时如果出了bug,懂些原理也能快速排查问题所在。

二、提交任务涉及到的核心参数

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

从ThreadPoolExecutor构造函数来看,不得不说的几个参数,核心线程数​​corePoolSize​​​、最大线程数​​maximumPoolSize​​​、工作队列​​workQueue​​​、线程工厂​​threadFactory​​​以及拒绝策略​​RejectedExecutionHandler​​,他们之间有着千丝万缕的关系:


  1. 当创建的线程数小于核心线程数​​corePoolSize​​时,提交任务会继续创建新线程执行任务。
  2. 当创建的线程数大于等于​​corePoolSize​​​时,此时再提交任务将被添加到工作队列​​workQueue​​中。
  3. 当工作队列​​workQueue​​​已满,此时再提交任务会创建新线程,触发第二个阈值的判断​​maximumPoolSize​​。
  4. 当创建的线程数大于等于最大线程数​​maximumPoolSize​​​时,此时再提交任务将触发拒绝策略​​RejectedExecutionHandler​​。

白纸黑字总是苍白的,如下是提交任务至线程池的流程图:

ThreadPoolExecutor源码解读(一)——重新认识ThreadPoolExecutor(核心参数、生命周期、位运算、ThreadFactory、拒接策略)_ThreadPool

除了具有主角光环的参数外,还有几个参数决定着工作线程的生死存亡。​​keepAliveTime​​​决定非核心线程数的线程的存活时长。当线程池中创建的线程数量超过设置的 ​​corePoolSize​​​,在某些线程处理完任务后,如果等待 ​​keepAliveTime​​​时间,仍然没有新的任务分配给它们,那么这些线程就属于空闲线程,将会被回收。线程池回收线程,没有所谓的“核心线程”和“非核心线程”之分,直到线程池的线程数等于最小核心线程数​​corePoolSize​​,回收才会停止。

看样子线程池一定会有小于等于​​corePoolSize​​数量的线程一直存活,这样如果这个线程池是非核心线程池,一直占用着线程势必会影响到核心线程池的运行,所以核心线程数内的线程也有被回收的需求。

在创建线程池时,构造函数中并没有显式设置核心线程数内的线程过期回收的参数,但是可以通过调用​​allowCoreThreadTimeOut(true)​​​方法将属性​​allowCoreThreadTimeOut​​​设置为true,从而使得核心线程数内的线程空闲等待​​keepAliveTime​​时间后,依然没有任务分配时被回收。

public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}

三、线程池的生命周期

世间万物都有生死轮回,线程池也不例外,它也有自己的生命周期。而巧妙的是,作者Doug Lea用一个32位的int变量表示两种含义:高3位表示线程池的运行状态,低29位表示工作线程数。

//一个ctl 表示两种含义,高3位为runState,低29为workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//536870911
//0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
//-536870912
//111 0 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING = -1 << COUNT_BITS;
//0
//000 0 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//536870912
//001 0 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
//1073741824
//010 0 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
//1610612736
//011 0 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
//从变量ctl中解析出runState
//先将CAPACITY做按位非操作,即~n = - ( n+1 ),就是 RUNNING
//然后再做按位与,可得出高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//从变量ctl中解析出workerCount
//对CAPACITY按位与,可得出低29位
//1 & 1 = 1,1 & 0 = 0,0 & 1 = 0,0 & 0 = 0
private static int workerCountOf(int c) { return c & CAPACITY; }
//将rs和wc转为二进制 再进行按位或计算,位上只要有1就是该位就是1
// 1 & 1 = 1,1 & 0 = 1,0 & 1 = 1,0 & 0 = 0
private static int ctlOf(int rs, int wc) { return rs | wc; }

  • ​CAPACITY = (1 << COUNT_BITS) - 1​​​,1转为二进制数​​0000 0000 0000 0000 0000 0000 0000 0001​​​向左移29位,得到​​001 0 0000 0000 0000 0000 0000 0000 0000​​​,这个过程相当于1*2^29,但是此时得到的是高3位的第一位(最小值),在其基础上减1就是低29位的最大值了,得到​​0001 1111 1111 1111 1111 1111 1111 1111​​​,将其设置为线程池的容量​​CAPACITY​​。
  • ​RUNNING = -1 << COUNT_BITS​​​,创建线程池后,线程池就处于正在运行状态​​RUNNING​​​,其是-1向左移29位,-1的二进制是​​1111 1111 1111 1111 1111 1111 1111 1111​​​,向左移29位后剩下3个1,低29位补0,得到​​1110 0000 0000 0000 0000 0000 0000 0000​​​,正好1占满了高3位。(-1的二进制是1的补码,原码取反+1就是补码,如1的原码是​​0000 0000 0000 0000 0000 0000 0000 0001​​​,取反后是​​1111 1111 1111 1111 1111 1111 1111 1110​​​,再加1就是​​1111 1111 1111 1111 1111 1111 1111 1111​​。)
  • ​SHUTDOWN = 0 << COUNT_BITS​​​,当调用​​shutdown()​​​,线程池进入​​SHUTDOWN​​​状态,​​SHUTDOWN​​是0向左移29位依然是0。
  • ​STOP = 1 << COUNT_BITS​​​,当调用​​shutdownNow()​​​,线程池进入​​STOP​​​状态,1左移29位得到​​001 0 0000 0000 0000 0000 0000 0000 0000​​。
  • ​TIDYING = 2 << COUNT_BITS​​​,​​TIDYING​​​是一个过渡状态,当线程池调用​​shutdown()​​​或​​shutdownNow()​​​后,当线程池中没有正在运行的线程且工作队列为空,此时设置线程池状态为​​TIDYING​​​。2左移29位得到​​010 0 0000 0000 0000 0000 0000 0000 0000​​。
  • ​TERMINATED = 3 << COUNT_BITS​​​,​​TERMINATED​​​才是代表线程池真正的寿终正寝。3左移29位得到​​011 0 0000 0000 0000 0000 0000 0000 0000​​。

还有三个方法才是将两个含义揉捻成一个变量,又分别拆出两个含义:


  • ​ctlOf(int rs, int wc) { return rs | wc; }​​​,运行状态​​rs​​​和工作线程数​​wc​​​,二者进行按位或​​|​​​计算合成一个变量​​ctl​​​。(​​|​​​操作,​​1 & 1 = 1,1 & 0 = 1,0 & 1 = 1,0 & 0 = 0​​,二者比较,只要位上有1,该位就是1。)
  • ​runStateOf(int c) { return c & ~CAPACITY; }​​​,从中拆出运行状态。先对​​CAPACITY​​​做按位非​​~​​​操作,即​​~CAPACITY = - ( CAPACITY+1 )​​​,就是 ​​RUNNING​​​。然后按位与​​&​​​操作,可得高3位。(​​RUNNING​​​高3位都是1,低29位都是0,所以​​&​​运算后,1只会出现在高3位,故而可得高3位)
  • ​workerCountOf(int c) { return c & CAPACITY; }​​​,从中拆出工作线程数。对​​CAPACITY​​​按位与​​&​​​,可得低29位。(因为​​CAPACITY​​​低29位都是1,高3位都是0,所以​​&​​运算后,1只会出现在低29位,故而可得低29位)

(​​&​​​操作,​​1 & 1 = 1,1 & 0 = 0,0 & 1 = 0,0 & 0 = 0​​,二者比较,位上都是1,该位才是1。)

如图所示线程池的生命周期流转图:

ThreadPoolExecutor源码解读(一)——重新认识ThreadPoolExecutor(核心参数、生命周期、位运算、ThreadFactory、拒接策略)_位运算_02

四、ThreadFactory如何创建工作线程

创建线程池时可以不传​​ThreadFactory​​​,此时会给一个默认线程工厂​​Executors.defaultThreadFactory()​​,而它究竟是怎样生产工作线程的呢?

​DefaultThreadFactory​​​实现了接口​​ThreadFactory​​,其主要做了3件事:


  • 创建工作线程,并设置分组和命名。
  • 工作线程是守护线程时,将线程设置为非守护线程。
  • 设置工作线程默认优先级​​NORM_PRIORITY​​。

public interface ThreadFactory {
Thread newThread(Runnable r);
}
//java.util.concurrent.Executors.DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
//创建线程时,设置分组和命名
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

用户可以自行实现​​ThreadFactory​​接口,设计特殊的线程工厂。

五、四种官方拒接策略

当工作线程数大于等于​​maximumPoolSize​​时,此时再提交任务将会触发拒绝策略。

创建线程池时也可不传​​RejectedExecutionHandler​​​,此时会给一个默认的拒绝策略​​AbortPolicy​​。

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

​AbortPolicy​​​总是会抛出一个​​RejectedExecutionException​​异常,再无其他操作。

public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

另外官方还提供了3种拒绝策略:

  1. CallerRunsPolicy​ 直接调用​​run()​​执行任务代码。
public static class CallerRunsPolicy implements RejectedExecutionHandler {

public CallerRunsPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
  1. DiscardPolicy​ 什么也不做。
public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
  1. DiscardOldestPolicy​ 丢弃工作队列中的下一个任务(排在head的任务),也是最老的任务,并调用execute()处理新任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

除了官方提供的四个拒绝策略外,用户还可以自行实现​​RejectedExecutionHandler​​接口设计特殊的拒绝策略。

六、总结


  • 【强制】线程池不允许使用 ​​Executors​​​去创建,而是通过 ​​ThreadPoolExecutor​​的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
  • 线程数小于​​corePoolSize​​​时,提交任务会继续创建线程,大于等于​​corePoolSize​​时,提交任务将加到工作队列中。
  • 当工作队列已满,此时提交任务会创建线程,直到线程数达到​​maximumPoolSize​​,再提交任务触发拒绝策略。
  • 默认情况下线程池中只会保留小于等于​​corePoolSize​​​数量的线程,多余​​corePoolSize​​​的空闲线程会等待​​keepAliveTime​​时间后,依然没有任务分配,则被回收。
  • 可以将​​allowCoreThreadTimeOut​​​设置为true,使得​​corePoolSize​​​数量内的线程,空闲等待​​keepAliveTime​​时间后,依然没有任务分配,则被回收。
  • ​ctl​​高3位表示线程池运行状态,低29位表示工作线程数。
  • ​ThreadFactory​​​和​​RejectedExecutionHandler​​都可自定义。

PS: ​如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!



举报

相关推荐

0 条评论