更多JUC源码解读系列文章请持续关注JUC源码解读文章目录JDK8!
文章目录
- 一、前言
- 二、提交任务涉及到的核心参数
- 三、线程池的生命周期
- 四、ThreadFactory如何创建工作线程
- 五、四种官方拒接策略
- 六、总结
一、前言
在Java中,创建一个线程new Thread
,就像创建一个对象一样简单,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,并且要为线程分配一系列的资源,所以线程是一个重量级的对象,应该避免频繁创建和销毁。线程池的出现就是为了管理线程,让线程复用。
阿里巴巴java开发手册中有一条强制的编程规约:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
既然推荐通过ThreadPoolExecutor
的方式使用线程池,则有必要了解一些底层原理,如:线程池的几个核心参数,提交任务的过程,任务是如何在线程池中运行,线程是如何复用,线程池的状态流转等。了解了这些原理知识,不仅可以轻松应对面试,更重要的是在实际工作中更好、更正确的使用线程池;同时如果出了bug,懂些原理也能快速排查问题所在。
二、提交任务涉及到的核心参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
从ThreadPoolExecutor构造函数来看,不得不说的几个参数,核心线程数corePoolSize
、最大线程数maximumPoolSize
、工作队列workQueue
、线程工厂threadFactory
以及拒绝策略RejectedExecutionHandler
,他们之间有着千丝万缕的关系:
- 当创建的线程数小于核心线程数
corePoolSize
时,提交任务会继续创建新线程执行任务。 - 当创建的线程数大于等于
corePoolSize
时,此时再提交任务将被添加到工作队列workQueue
中。 - 当工作队列
workQueue
已满,此时再提交任务会创建新线程,触发第二个阈值的判断maximumPoolSize
。 - 当创建的线程数大于等于最大线程数
maximumPoolSize
时,此时再提交任务将触发拒绝策略RejectedExecutionHandler
。
白纸黑字总是苍白的,如下是提交任务至线程池的流程图:
除了具有主角光环的参数外,还有几个参数决定着工作线程的生死存亡。keepAliveTime
决定非核心线程数的线程的存活时长。当线程池中创建的线程数量超过设置的 corePoolSize
,在某些线程处理完任务后,如果等待 keepAliveTime
时间,仍然没有新的任务分配给它们,那么这些线程就属于空闲线程,将会被回收。线程池回收线程,没有所谓的“核心线程”和“非核心线程”之分,直到线程池的线程数等于最小核心线程数corePoolSize
,回收才会停止。
看样子线程池一定会有小于等于corePoolSize
数量的线程一直存活,这样如果这个线程池是非核心线程池,一直占用着线程势必会影响到核心线程池的运行,所以核心线程数内的线程也有被回收的需求。
在创建线程池时,构造函数中并没有显式设置核心线程数内的线程过期回收的参数,但是可以通过调用allowCoreThreadTimeOut(true)
方法将属性allowCoreThreadTimeOut
设置为true,从而使得核心线程数内的线程空闲等待keepAliveTime
时间后,依然没有任务分配时被回收。
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
三、线程池的生命周期
世间万物都有生死轮回,线程池也不例外,它也有自己的生命周期。而巧妙的是,作者Doug Lea用一个32位的int变量表示两种含义:高3位表示线程池的运行状态,低29位表示工作线程数。
//一个ctl 表示两种含义,高3位为runState,低29为workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//536870911
//0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//-536870912
//111 0 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING = -1 << COUNT_BITS;
//0
//000 0 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//536870912
//001 0 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
//1073741824
//010 0 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
//1610612736
//011 0 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
//从变量ctl中解析出runState
//先将CAPACITY做按位非操作,即~n = - ( n+1 ),就是 RUNNING
//然后再做按位与,可得出高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//从变量ctl中解析出workerCount
//对CAPACITY按位与,可得出低29位
//1 & 1 = 1,1 & 0 = 0,0 & 1 = 0,0 & 0 = 0
private static int workerCountOf(int c) { return c & CAPACITY; }
//将rs和wc转为二进制 再进行按位或计算,位上只要有1就是该位就是1
// 1 & 1 = 1,1 & 0 = 1,0 & 1 = 1,0 & 0 = 0
private static int ctlOf(int rs, int wc) { return rs | wc; }
-
CAPACITY = (1 << COUNT_BITS) - 1
,1转为二进制数0000 0000 0000 0000 0000 0000 0000 0001
向左移29位,得到001 0 0000 0000 0000 0000 0000 0000 0000
,这个过程相当于1*2^29,但是此时得到的是高3位的第一位(最小值),在其基础上减1就是低29位的最大值了,得到0001 1111 1111 1111 1111 1111 1111 1111
,将其设置为线程池的容量CAPACITY
。 -
RUNNING = -1 << COUNT_BITS
,创建线程池后,线程池就处于正在运行状态RUNNING
,其是-1向左移29位,-1的二进制是1111 1111 1111 1111 1111 1111 1111 1111
,向左移29位后剩下3个1,低29位补0,得到1110 0000 0000 0000 0000 0000 0000 0000
,正好1占满了高3位。(-1的二进制是1的补码,原码取反+1就是补码,如1的原码是0000 0000 0000 0000 0000 0000 0000 0001
,取反后是1111 1111 1111 1111 1111 1111 1111 1110
,再加1就是1111 1111 1111 1111 1111 1111 1111 1111
。) -
SHUTDOWN = 0 << COUNT_BITS
,当调用shutdown()
,线程池进入SHUTDOWN
状态,SHUTDOWN
是0向左移29位依然是0。 -
STOP = 1 << COUNT_BITS
,当调用shutdownNow()
,线程池进入STOP
状态,1左移29位得到001 0 0000 0000 0000 0000 0000 0000 0000
。 -
TIDYING = 2 << COUNT_BITS
,TIDYING
是一个过渡状态,当线程池调用shutdown()
或shutdownNow()
后,当线程池中没有正在运行的线程且工作队列为空,此时设置线程池状态为TIDYING
。2左移29位得到010 0 0000 0000 0000 0000 0000 0000 0000
。 -
TERMINATED = 3 << COUNT_BITS
,TERMINATED
才是代表线程池真正的寿终正寝。3左移29位得到011 0 0000 0000 0000 0000 0000 0000 0000
。
还有三个方法才是将两个含义揉捻成一个变量,又分别拆出两个含义:
-
ctlOf(int rs, int wc) { return rs | wc; }
,运行状态rs
和工作线程数wc
,二者进行按位或|
计算合成一个变量ctl
。(|
操作,1 & 1 = 1,1 & 0 = 1,0 & 1 = 1,0 & 0 = 0
,二者比较,只要位上有1,该位就是1。) -
runStateOf(int c) { return c & ~CAPACITY; }
,从中拆出运行状态。先对CAPACITY
做按位非~
操作,即~CAPACITY = - ( CAPACITY+1 )
,就是 RUNNING
。然后按位与&
操作,可得高3位。(RUNNING
高3位都是1,低29位都是0,所以&
运算后,1只会出现在高3位,故而可得高3位) -
workerCountOf(int c) { return c & CAPACITY; }
,从中拆出工作线程数。对CAPACITY
按位与&
,可得低29位。(因为CAPACITY
低29位都是1,高3位都是0,所以&
运算后,1只会出现在低29位,故而可得低29位)
(&
操作,1 & 1 = 1,1 & 0 = 0,0 & 1 = 0,0 & 0 = 0
,二者比较,位上都是1,该位才是1。)
如图所示线程池的生命周期流转图:
四、ThreadFactory如何创建工作线程
创建线程池时可以不传ThreadFactory
,此时会给一个默认线程工厂Executors.defaultThreadFactory()
,而它究竟是怎样生产工作线程的呢?
DefaultThreadFactory
实现了接口ThreadFactory
,其主要做了3件事:
- 创建工作线程,并设置分组和命名。
- 工作线程是守护线程时,将线程设置为非守护线程。
- 设置工作线程默认优先级
NORM_PRIORITY
。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
//java.util.concurrent.Executors.DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
//创建线程时,设置分组和命名
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
用户可以自行实现ThreadFactory
接口,设计特殊的线程工厂。
五、四种官方拒接策略
当工作线程数大于等于maximumPoolSize
时,此时再提交任务将会触发拒绝策略。
创建线程池时也可不传RejectedExecutionHandler
,此时会给一个默认的拒绝策略AbortPolicy
。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
AbortPolicy
总是会抛出一个RejectedExecutionException
异常,再无其他操作。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
另外官方还提供了3种拒绝策略:
- CallerRunsPolicy 直接调用
run()
执行任务代码。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
- DiscardPolicy 什么也不做。
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
- DiscardOldestPolicy 丢弃工作队列中的下一个任务(排在head的任务),也是最老的任务,并调用execute()处理新任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
除了官方提供的四个拒绝策略外,用户还可以自行实现RejectedExecutionHandler
接口设计特殊的拒绝策略。
六、总结
- 【强制】线程池不允许使用
Executors
去创建,而是通过 ThreadPoolExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 - 线程数小于
corePoolSize
时,提交任务会继续创建线程,大于等于corePoolSize
时,提交任务将加到工作队列中。 - 当工作队列已满,此时提交任务会创建线程,直到线程数达到
maximumPoolSize
,再提交任务触发拒绝策略。 - 默认情况下线程池中只会保留小于等于
corePoolSize
数量的线程,多余corePoolSize
的空闲线程会等待keepAliveTime
时间后,依然没有任务分配,则被回收。 - 可以将
allowCoreThreadTimeOut
设置为true,使得corePoolSize
数量内的线程,空闲等待keepAliveTime
时间后,依然没有任务分配,则被回收。 -
ctl
高3位表示线程池运行状态,低29位表示工作线程数。 -
ThreadFactory
和RejectedExecutionHandler
都可自定义。
PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!