好了,我们接着上一篇来讲,当然,福利也都会一样赠上
workQueue:
指被提交但未执行的任务队列,它是一个BlockingQueue
接口的对象,仅用于存放Runnable
对象,根据队列功能分类,在ThreadPoolExecutor
的构造函数中可使用以下几种BlockingQueue
:
-
直接提交的队列:该功能由
SynchronousQueue
对象提供。SynchronousQueue
是一个特殊的阻塞队列,它没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用这个队列,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的工作线程,如果线程数量已经达到最大值(maximumPoolSize),则执行拒绝策略。因此,使用SynchronousQueue
队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。 -
有界的任务队列:有界的任务队列可以使用带有队列最大容量的
ArrayBlockingQueue
实现。有界队列仅在任务队列装满时,才可能将线程数提升到corePoolSize
以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize
以内。 -
无界的任务队列:无界的任务队列可以通过
LinkedBlockingQueue
类实现,与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。若任务创建速度远远大于处理速度,无界队列会快速增长,直到耗尽系统内存。 -
优先任务队列:优先任务队列是带有执行优先级的队列。它通过
PriorityBlockingQueue
实现,可以控制任务的执行先后顺序。它是以特殊的无界队列。它可以根据任务自身的优先级顺序来执行任务,在确保系统性能的同时,也能有很好的质量保证(总是确保高优先级的任务先执行)
回顾newFixedThreadPool()
方法的实现。它返回了一个corePoolSize
和maximumPoolSize
大小一样的,使用了LinkedBlockingQueue
任务队列的线程池。因为固定大小的线程池不会有线程数量的动态变化。由于它使用无界队列存放无法立即执行的任务,当任务提交非常频繁时,该队列会迅速膨胀,从而耗尽系统资源。
newSingleThreadExecutor()
返回的是单线程线程池,是newFixedThreadPool()
方法的一种退化,只是简单的将线程池数量设为1。
newCachedThreadPool()
方法返回corePoolSize
是0,maximumPoolSize
无穷大的线程池,这意味着在没有任务时,该线程池内无线程,而当任务被提交时,该线程池会使用空闲的线程执行任务;若无空闲线程,则将任务加入SynchronousQueue
队列,而这个队列是一个直接提交的队列,它总会迫使线程池增加新的线程执行任务。当任务执行完毕后,由于corePoolSize
是0,因此空闲线程又会在指定时间(60秒)被回收。
对于newCachedThreadPool()
,如果同时有大量任务被提交,而任务的执行又不那么快时,那么系统便会开启等量的线程处理,这样做可能会很快耗尽系统的资源。。
这里给出ThreadPoolExecutor
线程池的核心调度代码:
/*
* 先看一下addWorker这个方法
* @param core true 使用 corePoolSize 作为上限, 否则使用maximumPoolSize
* @return 成功返回true
*/
private boolean addWorker(Runnable firstTask, boolean core);
/**
* 在未来某时运行给定的任务,任务可能在新线程中运行,也可能在线程池中的线程中运行。
*
* 如果一个任务不能被提交(submit),不是因为executor已经被关闭,就是已经达到了executor的maximumPoolSize,
* 这个任务就会被当前的RejectedExecutionHandler来处理
*
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当工作线程总数小于corePoolSizes时
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//添加新线程失败,重新获取ctl的值
c = ctl.get();
}
//进入等待队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//进入等待队列失败(有界队列达到上限或者使用了SynchronousQueue)
//直接提交给线程池,如果线程数量达到maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
拒绝策略
ThreadPoolExecutor的最后一个参数指定了拒绝策略。当任务数量超过系统实际承载能力时的处理策略。内置了四种拒绝策略:
-
AbortPolicy:该策略会直接抛出异常,阻止系统正常工作
-
CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
-
DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务
-
DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。
以上内置的策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际应用需要,可以自己扩展RejectedExecutionHandler接口。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
下面的代码简单地演示了自定义线程池和拒绝策略的使用:
public class RejectedThreadPoolDemo {
private static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID:"
+ Thread.currentThread().getId());
try{
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is discard");
}
});
for (int i = 0; i < Integer.MAX_VALUE; i++) {
es.submit(task);
Thread.sleep(10);
}
}
}
上述的代码定义了一个线程池。该线程池有5个常驻线程,并且最大线程数量也是5个。但它有一个只有10个容量的等待队列。因此使用无界队列很可能并不是最佳解决方案,如果任务量极大,很可能会把内存撑爆。给出一个合理的队列大小,也是合理的选择。同时,这里自定义了拒绝策略,我们不抛出异常,因为万一在任务提交端没有进行异常处理,则有可能使得整个系统都崩溃瓶,这不是我们希望遇到的,但作为必要的信息记录,我将任务丢弃的信息进行打印。
自定义线程创建:ThreadFactory用来创建线程池需要的线程:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
当线程池需要新建线程时,就会调用这个方法。自定义线程池可以跟踪何时创建了多少线程,也可以自定义线程的名称、组以及优先级等信息。下面的案例使用自定义的ThreadFactory,一方面记录了线程的创建,另一方面将所有的线程都设置为守护线程:
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
System.out.println("create " + t);
return t;
}
});
for (int i = 0; i < 5; i++) {
es.submit(task);
}
Thread.sleep(2000);
扩展线程池ThreadPoolExecutor是一个可以扩展的线程池。它提供了beforeExecute()、afterExecute()、terminated()三个接口对线程池进行控制。在ThreadPoolExecutor.Worker.runTask()方法内部提供了这样的实现:
boolean ran = false;
beforeExecute(thread,task); //运行前
try{
task.run(); //运行任务
ran = true;
afterExecute(task,null); //运行结束后
++completedTasks;
}catch (RuntimeException ex) {
if (!ran)
afterExecute(task,ex); //运行结束
throw ex;
}
下面演示了对线程池的扩展,在这个扩展中,我们记录每一个任务的执行日志
package com.ha;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExtThreadPool {
private static class MyTask implements Runnable{
public String name;
public MyTask(String name){
this.name = name;
}
@Override
public void run() {
System.out.println("正在执行" + ":Thread ID:" + Thread.currentThread().getId()
+ ",Task Name=" + name);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行:"+((MyTask)r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成:"+((MyTask)r).name);
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
for (int i = 0; i < 5; i++) {
MyTask task = new MyTask("TASK-"+i);
es.execute(task);
Thread.sleep(10);
}
es.shutdown();
}
}
输出:
准备执行:TASK-0
正在执行:Thread ID:10,Task Name=TASK-0
准备执行:TASK-1
正在执行:Thread ID:11,Task Name=TASK-1
准备执行:TASK-2
正在执行:Thread ID:12,Task Name=TASK-2
准备执行:TASK-3
正在执行:Thread ID:13,Task Name=TASK-3
准备执行:TASK-4
正在执行:Thread ID:14,Task Name=TASK-4
执行完成:TASK-0
执行完成:TASK-1
执行完成:TASK-2
执行完成:TASK-3
执行完成:TASK-4
线程池退出
在线程池中寻找堆栈
首先看一个简单的案例,我们有一个Runnable接口,它用来计算两个数的商:
package com.ha;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DivTask implements Runnable{
int a,b;
public DivTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
double re = a/b;
System.out.println(re);
}
public static void main(String[] args) {
ThreadPoolExecutor pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
for (int i = 0; i < 5; i++) {
pools.submit(new DivTask(100,i));
}
}
}
输出:
100.0
50.0
33.0
25.0
从这个for循环来看,我们应该会得到5个结果,但是实际上只有4个。也就是说程序漏算了一组数据!但是没有任何日志,没有任何错误提示。在这个简单的案例中,只要仔细一点,就会发现,作为除数的i取到了0,这个缺失的值很可能是由于除以0导致的。但是在复杂的业务场景中,这种错误很难发现。
因此,使用线程池虽然是好事,但是还是得处处留意这些坑。线程池和可能会“吃”掉程序抛出的异常,导致我们对程序的错误一无所知。
那么,如何向线程池讨回异常堆栈呢?一种最简单的方法及时放弃submit(),改用execute():pools.execute(new DivTask(100,i));
或者使用下的方法改造submit():
Future re = pools.submit(new DivTask(100,i));
re.get();
输出
100.0
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero50.0
at com.ha.DivTask.run(DivTask.java:18)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
33.0
25.0
注意了,任务的具体提交位置已经被线程池淹没了,顺着堆栈,只能找到线程池中的调度流程,而这对于我们几乎是没有价值的。
我们只好扩展ThreadPoolExecutor线程池,让它在调度任务之前,先保存一下提交任务线程的堆栈信息。如下所示:
package com.ha;
import java.util.concurrent.*;
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
private Exception clientTrace(){
return new Exception("Client stack trace");
}
private Runnable wrap(final Runnable task,final Exception clientStack,
String clientThreadName){
return new Runnable() {
@Override
public void run() {
try{
task.run();
}catch (Exception e){
clientStack.printStackTrace();
throw e;
}
}
};
}
@Override
public void execute(Runnable task) {
super.execute(wrap(task,clientTrace(),Thread.currentThread().getName()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName()));
}
public static void main(String[] args) {
ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
for (int i = 0; i < 5; i++) {
pools.execute(new DivTask(100,i));
}
}
}
wrap()方法的第2个参数为一个异常,里面保存着提交任务的线程的堆栈信息。该方法将我们传入的Runnable任务进行一层包装,使之能处理异常信息。当任务发生异常时,这个异常会被打印。
输出:
100.0
java.lang.Exception: Client stack trace
50.0
33.0
25.0
at com.ha.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:11)
at com.ha.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:31)
at com.ha.TraceThreadPoolExecutor.main(TraceThreadPoolExecutor.java:44)
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at com.ha.DivTask.run(DivTask.java:18)
at com.ha.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:20)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
现在,我们不仅可以得到异常发生的Runnable实现内的信息,我们也知道了这个任务是在哪里提交的。