CompletableFuture是JDK1.8引入的类,使用了lambda和流水线的思想,为多个以同步任务和异步的复合操作结合在一起,能够更好的利用CPU并行计算。
引入CompletableFuture的好处
Java5引入的Future接口,通常用法如下:把一个耗时的任务放到线程池中执行,然后先执行其他方法,如果没有耗时的任务就没办法往下执行的时候用Future.get()超时获取任务的结果,如果任务没有返回,就会阻塞在这个方法。
ExecutorService executorService = new ThreadPoolExecutor(10, 10,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
FutureTask<Integer> future = (FutureTask<Integer>) executorService.submit(()->doSomthing());
doOtherThings();
try {
future.get(100L, TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException e ) {
} catch (TimeoutException e) {
}
我们可以看出Future能够在一定程度上减少线程的阻塞等待时间,但是存在一些缺点:
- 如果异步任务中抛出异常,Future不能返回异常的具体原因。
- 对于有依赖的任务编程不够简洁
CompletableFuture实现了CompletionStage和Future,不仅能够异步获取结果,还能流水线式地处理各种任务。
API
CompletableFuture提供了很多API,结合lambda表达式,给开发者带来了极大方便。可以先通过下面的例子来了解CompletableFuture的流式处理。
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFuture = shops.stream()
.map(shop-> CompletableFuture.supplyAsync(()->shop.getPrice(product), executor))
.map(future->((CompletableFuture) future).thenApply(Quote::parse))
.map(future-> ((CompletableFuture) future)
.thenCompose(quote->CompletableFuture.supplyAsync(()->doSomthing, executor)));
}
三次map处理所有的商品,对于每一个商品先计算出价格,再申请折扣,每一个商品都利用supplyAsync方法实现异步化的流式处理。你会发现CompletableFuture就像一条流水线一样,依次处理各种操作。
supplyAsync
supplyAsync方法接受Supplier作为参数,返回CompletableFuture对象,然后使用ForkJoinPool中的线程执行任务,当然,你也可以使用supplyAsync的重载版本,定制化线程池。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
使用实例:
public Future<Double> getPrice(String product){
return CompletableFuture.supplyAsync(()->calculatePrice(product));
}
allOf
allOf方法接受可变长度的CompletableFuture作为参数,然后调用join方法,可以让多个CompletableFuture对象并行计算,全都计算完成后返回结果。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
使用实例:
for(List<Long> ids:allIds){
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() ->{
return "finish";
}, taskExecutor);
all = CompletableFuture.allOf(cf);
}
if(Objects.nonNull(all)){
all.join();
}
runAsync
当不需要获取任务的结果时,可以使用runAsync方法,该方法有传递线程池参数的重载方法
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
completion
与并发流相比
Java8的并发流
public List findPrices(String product) {
return (List) shops.parallelStream().map(shop->String.format(shop.toString()))
.collect(toList());
}
CompletableFuture处理并发
public List findPrices(String product) {
return (List) shops.stream().map(shop->CompletableFuture
.supplyAsync(()->String.format(shop.toString())))
.collect(toList());
}
使用建议:如果任务是CPU密集型任务,那么推荐用stream接口,实现简单,效率也可能是最高的。但如果任务需要等待IO或者需要网络连接,那么CompletableFuture更合适,因为我们可以定制化线程池,来更好的利用CPU资源,根据线程池大小公式:Nthread =NCPUUcpu(1+W/C)(其中NCPU是CPU核数,Ucpu是期望的CPU利用率,W/C是等待时间与计算时间比率),那么可以适用调大线程池的大小。
与CountDownLatch对比
我们知道CountDownLatch能够调用await方法等待所有线程完成后进行下一步动作,这与CompletableFuture的allOf方法作用类似,但是CompletableFuture更加灵活,而且代码可读性更好,应该优先考虑使用。