0
点赞
收藏
分享

微信扫一扫

CompletableFuture学习

小时候是个乖乖 2021-09-24 阅读 61
日记本

CompletableFuture是JDK1.8引入的类,使用了lambda和流水线的思想,为多个以同步任务和异步的复合操作结合在一起,能够更好的利用CPU并行计算。

引入CompletableFuture的好处

Java5引入的Future接口,通常用法如下:把一个耗时的任务放到线程池中执行,然后先执行其他方法,如果没有耗时的任务就没办法往下执行的时候用Future.get()超时获取任务的结果,如果任务没有返回,就会阻塞在这个方法。

 ExecutorService executorService = new ThreadPoolExecutor(10, 10,
                60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
        FutureTask<Integer> future = (FutureTask<Integer>) executorService.submit(()->doSomthing());
        doOtherThings();
        try {
            future.get(100L, TimeUnit.SECONDS);
        } catch (ExecutionException | InterruptedException e ) {
            
        } catch (TimeoutException e) {
            
        }

我们可以看出Future能够在一定程度上减少线程的阻塞等待时间,但是存在一些缺点:

  1. 如果异步任务中抛出异常,Future不能返回异常的具体原因。
  2. 对于有依赖的任务编程不够简洁
    CompletableFuture实现了CompletionStage和Future,不仅能够异步获取结果,还能流水线式地处理各种任务。

API

CompletableFuture提供了很多API,结合lambda表达式,给开发者带来了极大方便。可以先通过下面的例子来了解CompletableFuture的流式处理。

public List<String> findPrices(String product) {
        List<CompletableFuture<String>> priceFuture = shops.stream()
                .map(shop-> CompletableFuture.supplyAsync(()->shop.getPrice(product), executor))
                .map(future->((CompletableFuture) future).thenApply(Quote::parse))
                .map(future-> ((CompletableFuture) future)
                        .thenCompose(quote->CompletableFuture.supplyAsync(()->doSomthing, executor)));
    }

三次map处理所有的商品,对于每一个商品先计算出价格,再申请折扣,每一个商品都利用supplyAsync方法实现异步化的流式处理。你会发现CompletableFuture就像一条流水线一样,依次处理各种操作。

supplyAsync

supplyAsync方法接受Supplier作为参数,返回CompletableFuture对象,然后使用ForkJoinPool中的线程执行任务,当然,你也可以使用supplyAsync的重载版本,定制化线程池。

   public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }

使用实例:

public Future<Double> getPrice(String product){
  return CompletableFuture.supplyAsync(()->calculatePrice(product));
}
allOf

allOf方法接受可变长度的CompletableFuture作为参数,然后调用join方法,可以让多个CompletableFuture对象并行计算,全都计算完成后返回结果。

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }

使用实例:

           for(List<Long> ids:allIds){
                CompletableFuture<String> cf = CompletableFuture.supplyAsync(() ->{
                   return "finish";
                }, taskExecutor);
                all = CompletableFuture.allOf(cf);
            }
       if(Objects.nonNull(all)){
                all.join();
            }
runAsync

当不需要获取任务的结果时,可以使用runAsync方法,该方法有传递线程池参数的重载方法

    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }
completion

与并发流相比

Java8的并发流

    public List findPrices(String product) {
        return (List) shops.parallelStream().map(shop->String.format(shop.toString()))
                .collect(toList());
    }

CompletableFuture处理并发

public List findPrices(String product) {
        return (List) shops.stream().map(shop->CompletableFuture
                .supplyAsync(()->String.format(shop.toString())))
                .collect(toList());
    }

使用建议:如果任务是CPU密集型任务,那么推荐用stream接口,实现简单,效率也可能是最高的。但如果任务需要等待IO或者需要网络连接,那么CompletableFuture更合适,因为我们可以定制化线程池,来更好的利用CPU资源,根据线程池大小公式:Nthread =NCPUUcpu(1+W/C)(其中NCPU是CPU核数,Ucpu是期望的CPU利用率,W/C是等待时间与计算时间比率),那么可以适用调大线程池的大小。

与CountDownLatch对比

我们知道CountDownLatch能够调用await方法等待所有线程完成后进行下一步动作,这与CompletableFuture的allOf方法作用类似,但是CompletableFuture更加灵活,而且代码可读性更好,应该优先考虑使用。


源码

举报

相关推荐

0 条评论