为什么需要CompletableFuture?
Future是Java5添加的类,用来描述一个异步计算的结果。 主动调用isDone
获取计算是否结束,get
阻塞获取计算结果,cancel
主动停止任务。
但是,这些方法不足之处在于,想要获取结果,我们只能通过轮询或者阻塞的方式。有没有一种方式,通过观察者模式当计算结果完成后及时通知监听者呢?
CompletableFuture登场了,包含50多个方法,帮助我们实现Future的各种编排。
创建CompletableFuture
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- runAsync 看参数即可知道,这是没有返回值的任务
- supplyAsync 消费者lamda函数,泛型U即为返回值,
- 如果不指定executor,默认使用ForkJoinPool.commonPool()作为线程池,一般我们都使用自己定义的线程池
示例
CompletableFuture.supplyAsync(() -> newQuery().select(fields)
.eq(isNotEmpty(logicDelete), "isDrop", logicDelete)
.in(idName, idList)
.getBeanList()
, executor)
Future完成的回调
完成时回调,返回原对象
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
BiConsumer<? super T,? super Throwable> fn
传入 T 、 异常E,相当于做出对象消费
- 任务完成后回调action,返回的也是CompletableFuture
- 无Async结尾用本线程同步执行
- 有Async结尾代表交给线程池异步执行,如果不指定executor,默认使用ForkJoinPool.commonPool()作为线程池
- exceptionally方法返回一个新的CompletableFuture,当原始的CompletableFuture抛出异常的时候,就会触发这个CompletableFuture的计算,调用function计算值,否则如果原始的CompletableFuture正常计算完后,这个新的CompletableFuture也计算完成,它的值和原始的CompletableFuture的计算的值相同。也就是这个
- exceptionally方法用来处理异常的情况
CompletableFuture.supplyAsync().whenComplete((t,e)-> {
System.out.println("执行完成" + t);
})
完成时回调,返回转换后对象
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
三个方法区别和上面一样就不赘述了
Function<? super T,? extends U> fn
传入 T 返回 U ,相当于做出对象的转换
示例
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 100;});
CompletableFuture<String> f = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString());
// 其实没必要这样写呀
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return (100*10) + "";});
无异常消费型
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
三个方法区别和上面一样就不赘述了
Consumer<? super T> fn
消费生成的T 无返回
组合多个CompletableFuture
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
- allOf方法是当所有的CompletableFuture都执行完后执行计算。 (allOf 执行完,是不是cfs里的future都有结果了,还是需要allOf().join())
- anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果相同。
示例
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(futureArray);
CompletableFuture<List<Object>> listCompletableFuture = voidCompletableFuture.thenApply(v -> Arrays.stream(futureArray).map(CompletableFuture::join).collect(Collectors.toList()));
listCompletableFuture.get().forEach(t -> result.addAll((Collection<? extends T>) t));
获取结果
public T get() // 阻塞获取结果
public T get(long timeout, TimeUnit unit) // 超时获取
public T getNow(T valueIfAbsent) // 立即获取
public T join() // 阻塞获取结果
getNow()
有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值。join()
返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别,你可以运行下面的代码进行比较:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1/0;
return 100;
});
//future.join();
future.get();
参考: https://colobu.com/2016/02/29/Java-CompletableFuture/ https://blog.csdn.net/lycyingO/article/details/120030162 https://blog.csdn.net/qq_40813329/article/details/125498494