部分源码分析
invokeAll() →→→→执行完全部"任务",方法结束
public abstract class AbstractExecutorService implements ExecutorService {
//省略其他方法
/**批量执行任务,只有这些任务全部执行完成之后,方法才会返回*/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
if (tasks == null)//如果传入"待执行任务"为空,则报错:"空"异常
throw new NullPointerException();
//定义将要执行的"任务线程"集合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
//遍历"任务"列表
for (Callable<T> t : tasks) {
//将"任务" 通过 new FutureTask<T>(callable); 方式转换为 RunnableFuture
// class FutureTask<V> implements RunnableFuture<V>
RunnableFuture<T> f = newTaskFor(t);
//将待执行"任务"放入列表
futures.add(f);
//执行该"任务"
execute(f);
}
//遍历"正在执行的新任务列表"
for (int i = 0, size = futures.size(); i < size; i++) {
//获取当前任务
Future<T> f = futures.get(i);
//判断该"任务"是否完成
if (!f.isDone()) {
//如果没有完成
try {
//则阻塞,等待其完成
f.get();
} catch (CancellationException ignore) {
//捕获 任务取消异常
} catch (ExecutionException ignore) {
//捕获 任务执行异常
}//除捕获异常外,抛出其他异常,使调用者处理
}
}
done = true;
return futures;
} finally {// finally 是为了防止发生未知异常,而做的异常相应处理
if (!done)
//如果此次执行(当前invokeAll()方法),没有正常完成,则取消列表中的所有任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
/**指定时间之内的批量任务,未执行及未执行完成的任务将取消
* 这里的timeout 是指所有任务需要完成的总时间,并不是每个单个任务分配的时间
*/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
//先将任务创建完备,放入集合,避免使用户设置的超时时间,耗费到不直接相关的操作上(使任务执行的计算时间更加精准)
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
//计算死亡时间, 当前虚拟机纳秒时间+用户设置时间(纳秒)偏移量 = 死亡时间(任务强制结束时间)
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
for (int i = 0; i < size; i++) {
//执行第i个任务
execute((Runnable)futures.get(i));
//每次都需要计算是否已经到达约定时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
//如果已经到达时间要求,则直接返回,退出方法
return futures;
}
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
//如果当前任务没有结束,则进行逻辑判断
if (!f.isDone()) {
//如果已经到达约定时间,则直接返回
if (nanos <= 0L)
return futures;
try {
//如果时间没到,则调用get()方法,等待当前任务执行完成,等待最长时间即为约定剩余时间nanos
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
//如果发生取消异常/执行异常/超时异常,则方法结束
return futures;
}
//计算剩余时间
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}
invokeAny() →→→→ 执行完任一"任务",即结束方法
public abstract class AbstractExecutorService implements ExecutorService {
/**执行完任一给定列表中的方法,即为方法结束*/
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
//断言为否,不可能发生(本身方法不具有超时时间的特性,所以不可能发生)
assert false;
return null;
}
}
/**给定时间内,执行完任一给定列表中的方法,即为方法结束*/
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
/**InvokeAny执行体**/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null) //参数非null判断(传入的任务为null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0) //非法参数异常(传入的"任务"数量为0)
throw new IllegalArgumentException();
//创建"任务"列表
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
try {
ExecutionException ee = null;
//判断有没有约定结束时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//迭代器
Iterator<? extends Callable<T>> it = tasks.iterator();
//将第一个任务迭代出转变为Future放入待执行任务列表
futures.add(ecs.submit(it.next()));
//任务数量总数,用于计算数量,放入待执行列表一个,所以数量减一
--ntasks;
//定义当前执行任务初始数量为1
int active = 1;
//无条件判断的 while 循环
for (;;) {
//获取任务线程执行结果(查看是否存在执行完的任务)
Future<T> f = ecs.poll();
//如果任一任务线程都没有执行完
if (f == null) {
//如果,还存在任务没有开始执行
if (ntasks > 0) {
//待执行任务数减一
--ntasks;
//执行任务 -----------------------(涉及其他相关类源码 : 目的即执行任务,如果任务执行完成,则将结果放入统一的队列中)
futures.add(ecs.submit(it.next()));
//当前已开始任务数加一
++active;
}
else if (active == 0)
break;
else if (timed) { //如果有约定时间限制
//则同理基础上,附加时间限制
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
//如果存在任务线程执行完了,获取到了结果
if (f != null) {
//因为当前任务执行完了,所以 计数器active减一
--active;
try {
//返回当前任务执行结果
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
//取消剩余任务
futures.get(i).cancel(true);
}
}
}