0
点赞
收藏
分享

微信扫一扫

关于ThreadPoolExecutor 调用RejectedExecutionHandler的机制

当我们创建线程池并且提交任务失败时,线程池会回调RejectedExecutionHandler接口的rejectedExecution(Runnable task, ThreadPoolExecutor executor)方法来处理线程池处理失败的任务,其中task 是用户提交的任务,而executor是当前执行的任务的线程池。可以通过代码的方式来验证。

1、线程池工厂:

package          com.threadpool;         

import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 线程池工厂方法
* @author
*
*/
public class ThreadPoolFactory {

//线程池
private static ThreadPoolExecutor pool;
//自身对象
private static ThreadPoolFactory factory;

/**
* 私有构造函数
*/
private ThreadPoolFactory(){ }

/**
* 获取工厂对象
* @param config
* @return
*/
public static ThreadPoolFactory getInstance(ThreadPoolConfig config){
if (factory == null ){
factory = new ThreadPoolFactory();
}

if (pool == null ){

if (config.getHandler() == null ){
pool = new ThreadPoolExecutor(config.getCorePoolSize(),
config.getMaximumPoolSize(),config.getKeepAliveTime(),
config.getUnit(),config.getWorkQueue());
} else {
pool = new ThreadPoolExecutor(config.getCorePoolSize(),
config.getMaximumPoolSize(),config.getKeepAliveTime(),
config.getUnit(),config.getWorkQueue(),config.getHandler());
}
}
System.out.println( "pool create= " +pool.toString());
return factory;
}

/**
* 添加线程池任务
* @param run
*/
public synchronized void addTask(Runnable run){
pool.execute(run);
}

/**
* 添加线程池任务
* @param runs
*/
public synchronized void addTask(List<Runnable> runs){
if (runs != null ){
for (Runnable r:runs){
this .addTask(r);
}
}
}

/**
* 关闭线程池
*/
public void closePool(){
pool.shutdown();
}

}

 

2、线程池配置文件类:

 

package          com.threadpool;         

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;

/**
* 线程池配置类
* @author
*
*/
public class ThreadPoolConfig {
//池中所保存的线程数,包括空闲线程。
private int corePoolSize;
//池中允许的最大线程数。
private int maximumPoolSize;
//当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
private long keepAliveTime;
//参数的时间单位。
private TimeUnit unit;
//执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。
private BlockingQueue<Runnable> workQueue;
//由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
private RejectedExecutionHandler handler;
//配置文件自身对象
private static ThreadPoolConfig config;
/**
* 单例模式
*/
private ThreadPoolConfig(){

}

/**
* 获取配置文件对象
* @return
*/
public static ThreadPoolConfig getInstance(){
if (config == null ){
config = new ThreadPoolConfig();
}
return config;
}
public int getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize( int corePoolSize) {
this .corePoolSize = corePoolSize;
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
public void setMaximumPoolSize( int maximumPoolSize) {
this .maximumPoolSize = maximumPoolSize;
}
public long getKeepAliveTime() {
return keepAliveTime;
}
public void setKeepAliveTime( long keepAliveTime) {
this .keepAliveTime = keepAliveTime;
}
public TimeUnit getUnit() {
return unit;
}
public void setUnit(TimeUnit unit) {
this .unit = unit;
}
public BlockingQueue<Runnable> getWorkQueue() {
return workQueue;
}
public void setWorkQueue(BlockingQueue<Runnable> workQueue) {
this .workQueue = workQueue;
}
public RejectedExecutionHandler getHandler() {
return handler;
}
public void setHandler(RejectedExecutionHandler handler) {
this .handler = handler;
}
}

3、简单任务类:

 

package          com.test;         

/**
* 任务线程
* @author
*
*/
public class ThreadTask extends Thread {

public ThreadTask(String name){
super (name);
}

@SuppressWarnings ( "static-access" )
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println( this .getName().toString() + ", will sleep 0 s" );
try {
this .sleep( 1 * 10 );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println( this .getName().toString() + ", I am wakeup now " );
}

}

4、异常处理接口实现类:

package          com.threadpool;         

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 线程池异常处理类
* @author
*
*/
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
// TODO Auto-generated method stub
System.out.println( "Begin exception handler-----------" );
//执行失败任务
new Thread(task, "exception by pool" ).start();
//打印线程池的对象
System.out.println( "The pool RejectedExecutionHandler = " +executor.toString());
}
}


5、测试主函数:

package          com.test;         

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.threadpool.MyRejectedExecutionHandler;
import com.threadpool.ThreadPoolConfig;
import com.threadpool.ThreadPoolFactory;

/**
* @author
*
*/
public class TestThreadPoolMain {

/**
* @param args
*/
public static void main(String[] args) {

//设置配置
ThreadPoolConfig config = ThreadPoolConfig.getInstance();
config.setCorePoolSize( 2 );
config.setMaximumPoolSize( 3 );
config.setKeepAliveTime( 5 );
config.setUnit(TimeUnit.SECONDS);
//将队列设小,会抛异常
config.setWorkQueue( new ArrayBlockingQueue<Runnable>( 10 ));
config.setHandler( new MyRejectedExecutionHandler());
//线程池工厂
ThreadPoolFactory factory = ThreadPoolFactory.getInstance(config);

for ( int i = 0 ;i< 100 ;i++){
factory.addTask( new ThreadTask(i+ "-i" ));
}
System.out.println( "i add is over!-------------------" );
}
}

6、测试比较:

可以看出创建的线程池对象和调用传递的线程池对象是相同的。

pool create = java.util.concurrent.ThreadPoolExecutor@de6f34
0-i, will sleep 0 s
Begin exception handler-----------
12-i, will sleep 0 s
The pool RejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor@de6f34
Begin exception handler-----------
1-i, will sleep 0 s
The pool RejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor@de6f34

举报

相关推荐

0 条评论