因为涉及到与操作系统的交互,所以构造一个新的线程开销有些大。如果在程序中有大量生命周期短的线程,那就不需把每个任务映射到一个单独线程,而是提供一个线程池(thread pool)。线程池中包含大量准备运行的线程。位线程池提供一个Runnable ,就会有一个线程调用run方法。当run方法退出,线程不会死亡,而是给线程池中为下1一个请求准备。
Callable与Future
Runnable 封装一个异步运行的任务,可以理解为是一种没有参数和返回值的异步方法。而Callable是有返回值的。Callable接口是一个参数化类型,只有一个Call方法。
public interface Callable<V>
{
V call() throws Exception;
}
类型参数是返回值的类型。例如, Callable 表示一个返回Integer对象的异步计算。
Future 保存异步计算结果,可以启动一个计算,将Future 对象交给某个线程,然后忘掉他,这个Future 对象的所有者在结果计算好之后可以获得结果。
Future接口有以下方法
V get();
V get(long timeout,TimeUnit unit);
void cance(boolean mayInterrupt);
boolean isCancelled();
boolean isDone();
第一个get方法调用会阻塞直到计算完、第二个get方法也会堵塞,但是如果计算结束之前调用会超时,会抛出TimeoutException异常。如果计算该线程时被中断,他们会抛出InterruptedException异常。如果计算完成,get方法立即返回。
如果还在计算, isDone方法返回false;反之返回true。
cancel用来取消计算,如果计算未开始直接取消,如果计算已经开始,则mayInterrupt参数为True则取消。
注意
取消一个任务涉及两个步骤。必须找到并中断底层线程。另外任务实现(call 方法中)必须感知到中断,并放弃它的工作。如果一个Future对象不知道任务在哪个线程中执行,或者如果任务没有监视执行该任务的线程的中断状态,那么取消任务没有任何效果。
FutureTask
执行Callable的一种方法是使用FutureTask,它实现Future和Runnable接口,可以构造一个线程任务来执行任务
Callable<Integer> task = ;
var futuretask = new FutureTask<Integer>(task);
var t = new Thread(futuretask);
t.start();
不过现实情况中更常见的是将一个Callable传递到一个执行器中。
执行器
执行器有许多静态工厂方法,下面的表格对其中的方法作了一个汇总。
|方法 | 描述 |
|newCachedThreadPool|必要时创建新线程;空闲线程会保留60秒|
|newFixedThreadPool | 池中包含固定数目的线程;空闲线程会一直保留|
| newWorkStealingPool | 一种适合“fork-join”任务的线程池,可以将复杂的任务分解成简单任务,空闲线程会“密取”简单任务 |
|newSingleThreadExecutor|只有一个线程的池,会顺序执行所提交的任务|
| newSingleThreadSchedExcutor | 用于调度执行固定线程池 |
控制任务组
为了执行器更有策略性,需要控制一组任务。
invokeAny
该方法提交一个Callable 对象集合中的所有对象,并返回某个已完成任务的结果。我们不知道返回的是哪个任务的结果,往往是最快完成的那个任务,对于搜索任务我们更愿意接受任何一种答案,就能使用这个方法。例如,假设需要对一个大整数进行因数分解,这是RSA解码时需要完成的一种计算。可以提交很多任务,每个任务尝试对不同范围的数进行分解。只要其中一个任务得到了答案,计算立止。
invokeAll
该方法提交一个Callable对象集合中所有对象,这个方法会阻塞,直到所有任务都完成,并返回所有任务答案的一个Future对象列表。得到计算结果后,还可以像下面这样的结果进行处理:
List<Callable<T>>tasks = ;
List<Future<T>> results = executor.invokeAll(tasks);
for(Future<T> result: results){
processFuture(result.get());
}
在for循环中第一个result.get()会堵塞,直到第一个结果可用。
下面的代码展示了如何用Callable和执行器。在第一个计算中,统计一会目录树中包含一个给定单词的文件的个数。为每个文件建立一个单独的任务:
package executors;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ExecutorDemo {
/*
* counts occurrences of a given word in a life
* */
public static long occurrences(String word, Path path) {
try(Scanner in = new Scanner(path)){
int count = 0;
while(in.hasNext()) {
if(in.next().equals(word)) count++;
}
return count;
}catch(IOException ex) {
return 0;
}
}
/*
* returns all descendants of a given directory
*
* */
public static Set<Path> descendants(Path rootDir) throws IOException{
try(Stream<Path> entries =Files.walk(rootDir)){
return (Set<Path>) entries.filter(Files::isRegularFile).collect(Collectors.toSet());
}
}
/*
* Yields a task that searches for a word in a file.
* */
public static Callable<Path> searchForTask(String word, Path path){
return ()->{
try(Scanner in = new Scanner(path)){
while(in.hasNext()) {
if(in.next().equals(word)) return path;
if(Thread.currentThread().isInterrupted()) {
System.out.println("Search in "+ path+"canceled");
return null;
}
}
throw new NoSuchElementException();
}
};
}
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException{
try(Scanner in = new Scanner(System.in)){
System.out.print("Enter base directory (e. g. /opt/jdk-9-src):");
String start = in.nextLine();
System.out.print("Enter keyword (e.g. volatile):");
String word = in.nextLine();
Set<Path> files = descendants(Paths.get(start));
ArrayList<Callable<Long>> tasks = new ArrayList<Callable<Long>>();
for(Path file : files ) {
Callable<Long> task = () -> occurrences(word, file);
tasks.add(task);
}
ExecutorService executor = Executors.newCachedThreadPool();
Instant startTime = Instant.now();
List<Future<Long>> results = executor.invokeAll(tasks);
long total =0;
for(Future<Long> result: results) {
total+=result.get();
}
Instant endTime = Instant.now();
System.out.println("Occurrences of "+word+":"+total);
System.out.println("Time elapsed:" + Duration.between(startTime, endTime).toMillis()+"ms");
ArrayList<Callable<Path>> searchTasks = new ArrayList<Callable<Path>>();
for(Path file: files) {
searchTasks.add(searchForTask(word, file));
}
Path found = executor.invokeAny(searchTasks);
System.out.println(word+"occurs in" + found);
if(executor instanceof ThreadPoolExecutor) {//the sinigle thread executor isn't
System.out.println("Largest pool size:"+((ThreadPoolExecutor) executor).getLargestPoolSize());
}
executor.shutdown();
}
}
}