IntStream
//类似for循环 for(int i = 0; i < 3; i ++)
IntStream intStream = IntStream.range(0, 3);
supplyAsync
运行一个异步任务并且返回结果
当任务不需要返回任何东西的时候可以使用
CompletableFuture.runAsync()
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of the asynchronous computation";
}
});
//如果get的时候还没return,则会阻塞当前进程
String result = future.get();
System.out.println(result);
传入自定义线程池
CompletableFuture
在执行异步任务的时候默认会从全局的 ForkJoinPool.commonPool()
获得一个线程中执行这些任务,也可以传入自定义的线程池。
thenApply(), thenAccept()和thenRun()在异步操作完成之后执行回调
thenApply
thenApply会将一个function<R,T>函数式接口作为入参,即接受一个T类型的参数,产出一个R类型的结果。
// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Rajeev";
});
// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
return "Hello " + name;
});
// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev
其实可以连起来写
CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Rajeev";
}).thenApply(name -> {
return "Hello " + name;
}).thenApply(greeting -> {
return greeting + ", Welcome to the CalliCoder Blog";
});
System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog
thenAccept
可以将supplyAsync
返回的参数作为入参,本身返回一个CompletableFuture<Void>
// thenAccept() example
CompletableFuture.supplyAsync(() -> {
return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
System.out.println("Got product detail from remote service " + product.getName())
});
thenRun
和上面的thenAccept
差不多,区别在于其不能访问supplyAsync
返回的参数,也就是没有入参。
在异步回调中传入自定义线程池
要注意直接在supplyAsync
接上thenApply/thenAccept/thenRun
操作,后续的操作都是在当前主线程中执行的
组合多个CompletableFuture
让他们独立完成CompletableFuture.allOf
让两个任务独立完成后做一些事情
CompletableFuture<String> one = CompletableFuture.supplyAsync(() -> {
System.out.println(" do something one");
return "one";
});
CompletableFuture<String> two = CompletableFuture.supplyAsync(() -> {
System.out.println(" do something two");
return "two";
});
//thenComposes是用于有序的组合,比如第一个完成时再进行下一个
//thenCombine是独立的组合,比如2个独立的任务完成的时候再进行下一步
CompletableFuture<Integer> three = one.thenCombine(two, (oneResult, twoResult) -> {
System.out.println(oneResult);
System.out.println(twoResult);
return 3;
});
//如果是要等待一批List的CompletableFuture完成可以用CompletableFuture.allOf
List<CompletableFuture<String>> list = Arrays.asList(one, two);
CompletableFuture<Void> allResult = CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]));
try {
System.out.println(three.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
exceptionally/handle回调处理异常
exceptionally
类似catch
,handle
则类似finally
如果在原始的supplyAsync()任务中发生一个错误,这时候没有任何thenApply会被调用并且future将以一个异常结束。如果在第一个thenApply发生错误,这时候第二个和第三个将不会被调用,同样的,future将以异常结束。
Integer age = -1;
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
if(age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if(age > 18) {
return "Adult";
} else {
return "Child";
}
}).exceptionally(ex -> {
System.out.println("Oops! We have an exception - " + ex.getMessage());
return "Unknown!";
});
try {
System.out.println("Maturity : " + maturityFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}