目录
线程池之拒绝策略(RejectedExecutionHandler )
线程
线程是调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的KLT模型,Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程在操作系统里面也会有一个对应的线程,Java线程有多种声明状态
相关类的继承关系:
线程池(ThreadPool)
线程池,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配,调优和监控
在web开发中,服务器需要接受并处理请求,所以会为一个请求分配一个线程来处理,如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:
如果并发的请求量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率,可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多.
那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢???
这就是线程池的目的了,线程池为线程生命周期的开销和资源不足提供了解决方案,通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上.
什么时候使用线程池???
- 单个任务处理时间比较短
- 需要处理的任务数量很大
优势:
- 1,线程复用,降低线程创建和销毁造成的消耗(降低资源消耗)
- 2,提高响应速度,当任务到达时不需要等待线程的创建就能执行
- 3,提高线程的可管理性,控制最大并发数.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控.
线程的实现方式:
//实现Runnable接口的类将被Thread执行,表示一个基本任务
@FunctionalInterface
public interface Runnable {
//run方法就是它实际要执行的内容
public abstract void run();
}
//Callable同样表示一个基本任务,与Runnable接口区别在于它接收泛型,同时它执行完任务后带有返回//内容
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
继承(实现)结构说明:
从图中可以看出Executor下有一个重要的子接口ExecutorService,其中定义了线程池的具体的行为
- execute(Runable command):执行Runnable类型的任务
- submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
- shutdown():在完成已提交的任务后封闭线程池,不再接收新的任务
- shutdown():停止所有的任务后封闭线程池
- isTerminated():测试是否所有的任务都执行完毕
- isShutdown():测试是否该ExecutorService已被关闭.
Executors是一个工具类,提供了常见配置线程池的方法
- 线程池的核心实现类是ThreadPoolExecutor
- 任务:也就是工作单元,包括被执行任务需要实现的接口:Runnable接口或者Callable接口;
- 任务的执行:也就是把任务分派给多个线程的执行机制,包括Executor接口及继承自Executor接口的ExecutorService接口。
- 异步计算的结果:包括Future接口及实现了Future接口的FutureTask类。
- 线程池执行现成的两种方式submit()和execute()
- 其中Callable对象有返回值,因此使用submit()方法;
- Runnable无返回值可以使用execute()方法,此外还可以使用submit()方法.当使用submit方法时:使用submit(Runnable task)或者submit(Runnable task, Object result)方法,使用submit(Runnable task)方法返回的null或者使用submit(Runnable task, Object result)方法返回result。
线程池的重点属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;
ctl是对线程池的运行状态和线程中有效线程的数量进行控制的一个字段,它包含两部分信息,线程池的运行状态(runState)和线程池内有效线程的数量(workCount),这里可以看到使用了integer类型来保存;高3位保存runState,低29位保存workCount,COUNT_BITS就是29,CAPACITY就是1左移29位减1(29个1)这个常量表示workCount的上限值,大约是5亿
ctl相关方法
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
- runStateOf:获取运行状态
- workerCountOf:获取活动线程数
- ctlOf:获取运行状态和活动线程数的值
线程池存在5种状态:
private static final int RUNNING = -1 << COUNT_BITS; //高3位为111
private static final int SHUTDOWN = 0 << COUNT_BITS; //高3位为000
private static final int STOP = 1 << COUNT_BITS; //高3位为001
private static final int TIDYING = 2 << COUNT_BITS; //高3位为010
private static final int TERMINATED = 3 << COUNT_BITS; //高3位为011
图形化解释:
- RUNNING
- 状态说明:线程池处于RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理
- 状态切换:线程池的初始化状态是RUNNING,换句话说,线程池一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0
- SHUTDOWN
- 状态说明:线程池处于SHUTDOWW状态时,不接受新任务,但能处理已添加的任务
- 状态切换:调用线程池的shutdown()接口时,线程池有RUNNING->SHUTDOWW
- STOP
- 状态说明:线程池处于STOP 状态时,不接受新任务,不处理已添加的任务,并且会中断正在处理的任务.
- 状态切换:调用线程池的shutdownNow()接口时,线程池有RUNNING or SHUTDOWW ->STOP
- TIDYING
- 状态说明:当所有的任务已终止,ctl记录的任务数量为0,线程池就会变为TIDYING状态,会执行钩子函数terminated().terminated()函数在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理,可以通过重载terminated()函数来实现.
- 状态切换:线程在SHUTDOWN状态下,阻塞队列为空并且线程池中的执行的任务也为空时,就会由SHUTDOWN->TIDYING,当线程在STOP状态下,线程池中执行的任务为空时,就会由STOP->TIDYING
- TERMINATED
- 状态说明:线程池彻底终止,就变成TERMINATED状态.
- 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由TIDYING->TERMINATED,进入TERMINATED的条件如下:
- 线程池不是RUNNING状态
- 线程池状态不是TIDYING状态或者TERMINATED状态
- 如果线程池状态是SHUTDOWN状态并且workQueue为空
- workQueue为0
- 设置TERMINATED状态成功
线程池的执行示意图:
- 在创建了线程池后,开始等待请求
- 当调用execute()方法将任务提交给线程池,线程池刚刚创建,里面一个线程都没有;优先创建出来的线程叫做核心线程
- execute()方法不断提交任务,线程池会做出判断,如果正在运行的的线程数量小于核心线程数,马上创建核心线程执行任务
- 注意: 如果提交第一个任务由创建的第一个核心线程已经执行完任务后,提交的第二个任务不会使用第一个核心线程,而是会创建第二个核心线程,直到达到设置的核心线程数后,此时才回去复用创建的核心线程
- 如果正在运行的线程数量大于等于核心线程数,将这个任务放入队列,当核心线程执行完其他任务后,去队列中取任务
- 如果队列满了,正在运行的线程数量小于最大线程数,创建非核心线程来执行任务
- 如果队列满了,正在运行的线程数量等于最大线程数(非核心线程+最大线程数),启动饱和聚合策略
- 当一个线程完成任务时,他会从队列中取下一个任务来执行,
- 当线程池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁直到只剩下corePoolSize个线程为止
图解上述过程:
//接受一个Runnable接口(ThreadPoolExecutor)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
*分三步进行:
* 1. 如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个线程,
* 启动一个新线程任务。对addWorker的调用会自动检查运行状态和 * workerCount,从而防止可能增加在不应该的情况下,返回false。
* 2. 如果任务可以成功排队,那么我们仍然需要再次检查我们是否应该添加一个线程
* (因为自上次检查以来,已有的已经死亡)或者自进入此方法后,池已关闭。所以 * 我们重新检查状态,如有必要,在以下情况下回滚排队已停止,如果没有线程, * 则启动新线程。
* 3. 如果无法将任务排队,则尝试添加新任务线。如果失败了,我们知道我们已经被
* 关闭或饱和了,所以拒绝这个任务。
*/
int c = ctl.get();
//得到当前线程池中线程的数量
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断当前线程池的生命状态;符合前一个条件,放到队列里面
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列满了,创建非核心线程
else if (!addWorker(command, false))
//如果非核心线程已经创建满了,执行拒绝策略
reject(command);
}
线程池之阻塞队列
所有已知实现类
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列(带上容量)
- LinkedBlockingQueue:由链表组成的有界阻塞队列(大小默认值integer.max_value)
- SynchronousQueue(同步移交队列 ):这个队列不存储元素每个插入put()的操作必须等到另一个线程去调用删除take()操作类似于消费者一对一的缓冲作用
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列
- LinkedTransferQueue:由链表组成的无界阻塞队列
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列
- LinkedBlockingDeque:由链表组成的双向阻塞队列
上面的阻塞队列都会有下面的这些方法
方法类型 | 抛出异常 | 有返回值,不抛异常 | 阻塞等待(死等) | 超时等待 |
---|---|---|---|---|
添加 | add(e) | Offer(e) | Put(e) | Offer(e,time,unit) |
移除 | remove | Poll() | Take | Poll(time,unit) |
检查 | element() | Peek | (不可用) | (不可用) |
类型 | 解释 |
---|---|
抛出异常 | 当阻塞队列满时,再往队列里面add插入元素会抛出IllegalStateException:Queue full 当阻塞队列空时,再往队列里移除元素时会抛NoSuchElementException |
特殊值 | 插入方法时,成功true,失败false; 移除方法时,成功返回队列的元素,队列里没有就返回null |
一直阻塞 | 当阻塞队列满时,继续往队列里面put元素,队列会一直阻塞生产线程知道put线程或者响应中断退出 当阻塞队列为空,消费者视图从队列里面take元素,队列会一直阻塞消费者线程直到队列可用 |
超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出 |
队列要根据场景去选择:如果不希望任务被丢弃建议用无界队列,或者考虑到JVM的容量的问题可以自己去定义一个拒绝策略把任务存放到Redis中,监控当阻塞队列的容量小于50%时,把redis里面的任务放到阻塞队列里面
线程池之拒绝策略(RejectedExecutionHandler )
- AbortPolicy终止策略(默认使用该拒绝策略)抛出异常 RejectedExecutionException
/**
* 被拒绝任务的处理程序,抛出{@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- DiscardPolicy抛弃策略:默默地丢弃无法处理的任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的一种策略
/**
* 被拒绝任务的处理程序,会自动放弃被拒绝的任务。
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* 不执行任何操作,这会产生丢弃任务r的效果。
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
- DiscardOldestPolicy抛弃旧任务策略:抛弃阻塞队列中的旧任务,不会抛出异常
/**
* 一种被拒绝任务的处理程序,它丢弃最早的未处理请求,然后重试{@code execute},除
* 非执行器关闭,在这种情况下任务被丢弃。
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* 获取并忽略执行器将执行的下一个任务(如果一个任务立即可用),然后重试执行任务
* r,除非执行器关闭,在这种情况下,任务r将被丢弃。
*
* @param r 请求执行的可运行任务
* @param e 试图执行此任务的执行者
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
- CallerRunsPolicy调用者运行策略:主线程执行多余的任务(既不会抛出异常,也不会抛弃任务,将某些任务回退到调用者)
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
*在调用者的线程中执行任务r,除非执行器已关闭,在这种情况下,任务被丢弃。
*
* @param r 请求执行的可运行任务
* @param e 试图执行此任务的执行者
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
//Demo:
public class Main {
public static void main(String[] args) {
//主线程提交任务
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("调用当前任务的线程:"+Thread.currentThread().getName());
}
};
runnable.run();
}
}
//打印结果 调用当前任务的线程:main
- 自定义拒绝策略 需要实现RejectedExecutionHandler接口
根据需要选择使用的拒绝策略
监控线程池的运行状态
需要关注ThreadPoolExecutor中的四个方法
- Shutdown():启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。
- ShutdownNow():尝试停止所有主动执行的任务,停止等待任务的处理,并返回正在等待执行的任务列表。
- beforeExecutor(Thread t, Runnable r):在给定的线程中执行给定的Runnable之前调用方法
- afterExecutor(Runnable r, Throwable t):完成指定Runnable的执行后调用方法。
部分源码分析
详细的源码分析会再出一篇文章
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//通过线程工厂创建线程,并且把子集作为参数传进去
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//判断线程池的状态
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个Worker:并且把任务放到Worker里面,初始化Worker
//里面已经通过线程工厂创建了一个新的线程
w = new Worker(firstTask);
//将Woreker里面的线程赋值给t
final Thread t = w.thread;
//判断t不为空
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//执行加锁:控制并发
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//状态的判断
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//把w放到works集合里面
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//最终会去调用work里面的run()方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//调用的是线程池里面的runWorker()方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//拿到Worker里面的firstTask
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//while循环(线程可以重用的原因)
//task = getTask() == null会跳出循环
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
//当遇到异常时,这句话不会执行
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//不断的从队列里面去取
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//如果时间超时或者当前的worker大于核心数量返回true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
//当队列为空条件不满足时阻塞
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//当前线程数小于核心线程数时:走这个逻辑
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
//当遇到异常时会走到这个里面completedAbruptly为true
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
//判断当前线程池的状态是否小于STOP,也就是:RUNNING SHUTDOWN
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//重新增加一个worker
addWorker(null, false);
}
}