本文不讲解线程池执行过程过状态的判断,如要阅读可留言索要源码分析代码
基本流程:
用一个AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))的原子类来保存线程数量和线程池状态,共32位, 高3位保存运行状态, 低29位保存线程数量。其中根据运行状态和线程数量来判断是否需要添加工作线程池。然后调用addWork()创建工作线程Work,每个Work包含thread、firstTask、completedTasks。工作线程创建会调用runWoker()方法来从队列取任务,达到线程复用的效果。
线程池怎么达到线程复用?
在调用runWoker时,会判断task是否为空,如果不为空则直接处理,为空则getTask()从队列中获取
/**
* 执行线程任务的真正处理逻辑
* 1 如果 task 不为空,则开始执行 task
* 2 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务
* 3 执行完毕后,通过 while 循环继续 getTask()取任务
* 4 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
// unlock,表示当前 worker 线程允许中断,因为 new Worker 默认的 state=-1,此处是调用
// Worker 类的 tryRelease()方法,将 state 置为 0,
// 而 interruptIfStarted()中只有 state>=0 才允许调用中断
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//注意这个 while 循环,在这里实现了 [线程复用] , 如果 task 为空,则通过getTask 来获取任务
while (task != null || (task = getTask()) != null) {} // 这里达到了线程复用的效果
....省略.....
}
Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现呢?
worker实现的tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的。这样容易中断正在执行的任务,就是 为了避免锁可重入导致正在运行中的任务中断 。
线程池提供了多个自定义模板方法,方便监听线程状态或者实现钩子:
protected void beforeExecute(Thread t, Runnable r) { }// 执行前
protected void afterExecute(Runnable r, Throwable t) { }// 执行后
void onShutdown() {}//关闭
protected void terminated() { }//终止
举例子:如果公司需要自定义并行处理框架,如https://gitee.com/jd-platform-opensource/asyncTool,肯定是需要监听线程池状态,然后进行上报,这时候就需要实现beforeExecute、afterExecute、onShutdown来进行监听了
有一种特殊情况。就是线程池coreSize设置为0时(队列和最大线程数不为0),这时候为什么没有直接拒绝策略?而且采用单线程coreSize=1执行的方式?
-- ???为什么我会发现这个问题?因为我看到xxl-job客户端源码callbackThreadPool 的coreSize=0,所以就很奇怪
因为Woker实现了Runnable接口,并重写了run()方法,如果判断到task即传过来runnable为空,就会从队列里面去一个task来用。这就导致了如果coreSize=0,就会是coreSize=1的效果来表现。
``` 以下是代码,路径为:execute()->addWorker->run->runWorker->(task = getTask()) != null
// 调用线程池调用的execute()方法
.... 省略....
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
.... 省略....
}
addWorker(Runnable firstTask, boolean core) {
.... 省略....
if (workerAdded) {
t.start();// 调用了实现了Runnable接口的worker
workerStarted = true;
}
.... 省略....
}
// Worker里面的run()方法
runWorker(this);
}
final void runWorker(Worker w) {
.... 省略....
(task = getTask()) != null) {......}
.... 省略....
}
```
以下是一些附件:
- 生命周期流程:
- 线程池corePoolSize=0的代码例子:
- 参考链接: https://www.jianshu.com/p/a977ab6704d7