0
点赞
收藏
分享

微信扫一扫

ExecutorCompletionService用法简介

独兜曲 2022-02-20 阅读 98

一、ExecutorCompletionService

通常在执行一批需要返回结果的任务时,我们可以使用线程池来提高程序运行效率,通过线程池的 submit(Callable task) 不断提交异步任务,并将 Future 保存下来,之后遍历 Future,调用 get() 方法获取结果。

虽然任务都是异步执行的,但是 get Future 结果是阻塞的。例如第一个 future 需要计算5s才能返回结果,但是其他 future 不到1s就会返回计算结果,需要等待第一个 future 结果返回才能 get 到其他 future 的结果,这就白白浪费了很多时间。

此时就可以使用 ExecutorCompletionService,它实现了 CompletionService 接口。它的内部有一个先进先出的阻塞队列,用于保存执行结束的 future,通过调用 ExecutorCompletionService 的 take 方法,就可以获取到第一个已经执行完成的 Future,之后再调用 future 的 get 方法,就可以获取到最终的结果。

动动发财小手,关注 + 点赞 + 收藏不迷路。

二、Demo演示

ExecutorCompletionService 示例代码如下:

package thread;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.time.ZonedDateTime;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author: tinker
 * @Date: 2022/02/17 13:21
 */
public class ExecutorCompletionServiceDemo {

    private static final String THREAD_FACTORY_NAME_FORMAT = "-pool-%d";
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAXIMUM_POOL_SIZE = 10;
    private static final int KEEP_ALIVE_TIME = 60;
    private static final int LINKED_BLOCKING_QUEUE_CAPACITY = 1000;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("executor-test" + THREAD_FACTORY_NAME_FORMAT)
                .build();

        Executor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAXIMUM_POOL_SIZE,
                KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(LINKED_BLOCKING_QUEUE_CAPACITY),
                threadFactory,
                new ThreadPoolExecutor.CallerRunsPolicy());
        CompletionService<ExecutorCompletionServiceDemo.Result> completionService = new ExecutorCompletionService(executor);
        for (int i = 1; i <= 5; i++) {
            completionService.submit(new Task("task" + i, i));
        }

        System.out.println("print result");

        for (int i = 1; i <= 5; i++) {
            Future<Result> future = completionService.take();
            System.out.println(future.get().result);
        }

        System.exit(0);
    }

    public static class Task implements Callable<ExecutorCompletionServiceDemo.Result> {

        private String taskName;
        private long sleepSeconds;

        Task(String taskName, long sleepSeconds) {
            this.taskName = taskName;
            this.sleepSeconds = sleepSeconds;
        }

        @Override
        public ExecutorCompletionServiceDemo.Result call() throws Exception {
            Thread.sleep(sleepSeconds * 1000);
            return new Result(Thread.currentThread().getName() + ": " + taskName + " run end, time = " + ZonedDateTime.now());
        }
    }

    public static class Result {
        private String result;

        Result(String result) {
            this.result = result;
        }
    }

}


输出如下:

print result
executor-test-pool-4: task1 run end, time = 2022-02-17T16:48:45.025+08:00[Asia/Shanghai]
executor-test-pool-3: task2 run end, time = 2022-02-17T16:48:45.947+08:00[Asia/Shanghai]
executor-test-pool-2: task3 run end, time = 2022-02-17T16:48:46.946+08:00[Asia/Shanghai]
executor-test-pool-1: task4 run end, time = 2022-02-17T16:48:47.945+08:00[Asia/Shanghai]
executor-test-pool-0: task5 run end, time = 2022-02-17T16:48:48.946+08:00[Asia/Shanghai]

通过输出结果可以看出,线程0 sleep时间为5s,最后一个print,线程4 sleep时间为1s,第一个print。

引用:
1.https://blog.csdn.net/windrui/article/details/101366444

举报

相关推荐

0 条评论