ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 10, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
executor.execute(thread1);
一. 对ThreadPoolExecutor进行初始化的时候主要做了以下几件事情:
1.参数Executors.defaultThreadFactory():初始化默认线程工厂,此线程工厂实现ThreadFactory接口(里面维护了一个newThread()方法,在executor调用execute方法里面对Worker这个内部类做了初始化,然后通过调用TreadFactory的newThread()方法创建新线程并赋值给Worker里维护的Thread成员变量)。
2.对ThreadPoolExecutor中的一些new出来的成员变量进行初始化,包括:
1)、ctl(记录线程池状态(前三位)和正在工作的线程数量(后29位))
2)、mainLock(主锁,在对工作集合内容进行添加删除时会掉用主锁来锁住这个过程)
3)、workers(工作线程的容器,又叫线程池,里面维护了多个worker对象,worker实现了Runnable接口本身可以作为线程使用,worker里面维护了业务任务、一个由DefaultThreadFactory创建并赋值的线程以及一个记录每个线程任务数量的long变量)
4)、termination(mianLock.newCondition()出来的Condition对象,未具体探究)
- 根据构造方法里的参数值对ThreadPoolExecutor里的变量进行赋值
问题小结:
用new创建并初始化对象的步骤:
1)给对象的实例变量(非常量)分配内存空间,默认初始化成员变量。
2)成员变量声明时的初始化;
3)初始化块初始化(又称为构造代码块和非静态代码块);
4) 构造方法的初始化;
二. 执行execute() 方法里面做了下面的事:
1.当前工作线程数小于核心线程数,则向线程池(set集合)添加用于处理任务的线程。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
2.工作线程数大于核心线程数、线程池的工作状态是Running状态(线程池接受新任务并处理队列的任务),然后向队列添加任务成功,添加任务成功后再次获取线程池的状态值(以为在此期间,线程池的状态有可能发生变化),判断状态是否是Running,如果是则删除队列里的任务,执行拒绝策略,否则在Running状态下往线程池里添加一个线程用来处理任务队列里的任务。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
3.当工作线程数大于核心线程数并且向任务队列添加任务失败前提下,增加线程池的数量(corePool++),如果增加失败则执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
addWorker()方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
主要做了两件事:一个是对线程池当前状态下处理任务的线程数量的递增(CAS操作);
一个是在线程池当前状态下,向线程池添加处理任务的线程,在添加成功的前提下,启动Worker里的线程来处理Worker里的任务(加了ReentrantLock的锁操作)。
需要注意的是这种处理结构:
Worker实现了Runnable接口,本身是一个线程(这个线程就是处理任务的线程),而具体的任务赋值给Worker内部的变量Runnable,由Worker内部维护的Thread(ThreadPoolExecutor初始化时,为这里的Thread变量赋值)来处理。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker类中的runWorker(this)方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果前面的firstTask有值,就直接执行这个任务
//如果没有具体的任务,就执行getTask()方法从队列中获取任务
//这里会不断执行循环体,除非线程中断或者getTask()返回null才会跳出这个循环
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker()方法做了响应中断的处理:
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
其中getTask()方法(CAS操作)这里面涉及到keepAliveTime的使用,从这个方法我们可以看出先吃池是怎么让超过corePoolSize的那部分worker销毁的。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 如果设置了核心worker也会超时或者当前正在运行的worker数量超过了corePoolSize,就要根据时间判断是否要销毁线程了
//其实就是从队列获取任务的时候要不要设置超时间时间,如果超过这个时间队列还没有任务进来,就会返回null
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
// take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
问题小结:
1.短路与(&&)的使用,若前面为false则后面不执行;短路或(||)的使用,若前面为true则后面不执行。
2.线程池的五个状态:(负数在java里-用1表示,绝对值的补码表示具体值)
Running (-1<<29 )
111 00000000000000000000000000000
接受新任务并处理队列里的任务
ShutDown (0<<29)
000 00000000000000000000000000000
不接受新任务但处理队列里的任务
Stop (1<<29)
001 00000000000000000000000000000
不接受新任务,不处理队列里的任务,中断正在处理的任务
Tidying (2<<29)
010 00000000000000000000000000000
所有任务都已结束,workCount为0,线程过渡到TIDYING状态,将会执行terminated()钩子方法
Terminated (3<<29)
011 00000000000000000000000000000
terminated()完成