0
点赞
收藏
分享

微信扫一扫

Java 线程池体系 - AbstractExecutorService

圣杰 2022-03-27 阅读 51

部分源码分析

invokeAll() →→→→执行完全部"任务",方法结束
public abstract class AbstractExecutorService implements ExecutorService {

    //省略其他方法
    
    /**批量执行任务,只有这些任务全部执行完成之后,方法才会返回*/
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        if (tasks == null)//如果传入"待执行任务"为空,则报错:"空"异常
            throw new NullPointerException();
        //定义将要执行的"任务线程"集合
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            //遍历"任务"列表
            for (Callable<T> t : tasks) {
                //将"任务" 通过 new FutureTask<T>(callable); 方式转换为 RunnableFuture
                // class FutureTask<V> implements RunnableFuture<V> 
                RunnableFuture<T> f = newTaskFor(t);
                //将待执行"任务"放入列表
                futures.add(f);
                //执行该"任务"
                execute(f);
            }
            //遍历"正在执行的新任务列表"
            for (int i = 0, size = futures.size(); i < size; i++) {
                //获取当前任务
                Future<T> f = futures.get(i);
                //判断该"任务"是否完成
                if (!f.isDone()) {
                    //如果没有完成
                    try {
                        //则阻塞,等待其完成
                        f.get();
                    } catch (CancellationException ignore) {
                        //捕获 任务取消异常
                    } catch (ExecutionException ignore) {
                        //捕获 任务执行异常
                    }//除捕获异常外,抛出其他异常,使调用者处理
                }
            }
            done = true;
            return futures;
        } finally {// finally 是为了防止发生未知异常,而做的异常相应处理
            if (!done)
                //如果此次执行(当前invokeAll()方法),没有正常完成,则取消列表中的所有任务
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

    /**指定时间之内的批量任务,未执行及未执行完成的任务将取消
    * 这里的timeout 是指所有任务需要完成的总时间,并不是每个单个任务分配的时间
    */
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            //先将任务创建完备,放入集合,避免使用户设置的超时时间,耗费到不直接相关的操作上(使任务执行的计算时间更加精准)
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            //计算死亡时间, 当前虚拟机纳秒时间+用户设置时间(纳秒)偏移量 = 死亡时间(任务强制结束时间)
            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            for (int i = 0; i < size; i++) {
                //执行第i个任务
                execute((Runnable)futures.get(i));
                //每次都需要计算是否已经到达约定时间
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    //如果已经到达时间要求,则直接返回,退出方法
                    return futures;
            }

            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                //如果当前任务没有结束,则进行逻辑判断
                if (!f.isDone()) {
                    //如果已经到达约定时间,则直接返回
                    if (nanos <= 0L)
                        return futures;
                    try {
                        //如果时间没到,则调用get()方法,等待当前任务执行完成,等待最长时间即为约定剩余时间nanos
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        //如果发生取消异常/执行异常/超时异常,则方法结束
                        return futures;
                    }
                    //计算剩余时间
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }
}
invokeAny() →→→→ 执行完任一"任务",即结束方法
public abstract class AbstractExecutorService implements ExecutorService {


	/**执行完任一给定列表中的方法,即为方法结束*/
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            //断言为否,不可能发生(本身方法不具有超时时间的特性,所以不可能发生)
            assert false;
            return null;
        }
    }

    /**给定时间内,执行完任一给定列表中的方法,即为方法结束*/
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

    /**InvokeAny执行体**/
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null) //参数非null判断(传入的任务为null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0) //非法参数异常(传入的"任务"数量为0)
            throw new IllegalArgumentException();
        //创建"任务"列表
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);

        try {
            ExecutionException ee = null;
            //判断有没有约定结束时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            //迭代器
            Iterator<? extends Callable<T>> it = tasks.iterator();
			//将第一个任务迭代出转变为Future放入待执行任务列表 
            futures.add(ecs.submit(it.next()));
            //任务数量总数,用于计算数量,放入待执行列表一个,所以数量减一
            --ntasks;
            //定义当前执行任务初始数量为1
            int active = 1;

            //无条件判断的 while 循环
            for (;;) {
                //获取任务线程执行结果(查看是否存在执行完的任务)
                Future<T> f = ecs.poll();
                //如果任一任务线程都没有执行完
                if (f == null) {
                    //如果,还存在任务没有开始执行
                    if (ntasks > 0) {
                        //待执行任务数减一
                        --ntasks;
                        //执行任务 -----------------------(涉及其他相关类源码 : 目的即执行任务,如果任务执行完成,则将结果放入统一的队列中)
                        futures.add(ecs.submit(it.next()));
                        //当前已开始任务数加一
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) { //如果有约定时间限制
                        //则同理基础上,附加时间限制
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                //如果存在任务线程执行完了,获取到了结果
                if (f != null) {
                    //因为当前任务执行完了,所以 计数器active减一
                    --active;
                    try {
                        //返回当前任务执行结果
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (int i = 0, size = futures.size(); i < size; i++)
                //取消剩余任务
                futures.get(i).cancel(true);
        }
    }

 }
举报

相关推荐

0 条评论