0
点赞
收藏
分享

微信扫一扫

线程池和任务

钎探穗 2022-01-22 阅读 83

因为涉及到与操作系统的交互,所以构造一个新的线程开销有些大。如果在程序中有大量生命周期短的线程,那就不需把每个任务映射到一个单独线程,而是提供一个线程池(thread pool)。线程池中包含大量准备运行的线程。位线程池提供一个Runnable ,就会有一个线程调用run方法。当run方法退出,线程不会死亡,而是给线程池中为下1一个请求准备。

Callable与Future

Runnable 封装一个异步运行的任务,可以理解为是一种没有参数和返回值的异步方法。而Callable是有返回值的。Callable接口是一个参数化类型,只有一个Call方法。

public interface Callable<V>
{
	V call() throws Exception;
	
}

类型参数是返回值的类型。例如, Callable 表示一个返回Integer对象的异步计算。

Future 保存异步计算结果,可以启动一个计算,将Future 对象交给某个线程,然后忘掉他,这个Future 对象的所有者在结果计算好之后可以获得结果。
Future接口有以下方法

V get();
V get(long timeout,TimeUnit unit);
void cance(boolean mayInterrupt);
boolean isCancelled();
boolean isDone();

第一个get方法调用会阻塞直到计算完、第二个get方法也会堵塞,但是如果计算结束之前调用会超时,会抛出TimeoutException异常。如果计算该线程时被中断,他们会抛出InterruptedException异常。如果计算完成,get方法立即返回。

如果还在计算, isDone方法返回false;反之返回true。
cancel用来取消计算,如果计算未开始直接取消,如果计算已经开始,则mayInterrupt参数为True则取消。

注意
取消一个任务涉及两个步骤。必须找到并中断底层线程。另外任务实现(call 方法中)必须感知到中断,并放弃它的工作。如果一个Future对象不知道任务在哪个线程中执行,或者如果任务没有监视执行该任务的线程的中断状态,那么取消任务没有任何效果。

FutureTask

执行Callable的一种方法是使用FutureTask,它实现Future和Runnable接口,可以构造一个线程任务来执行任务

Callable<Integer> task = ;
var futuretask = new FutureTask<Integer>(task);
var t = new Thread(futuretask);
t.start();

不过现实情况中更常见的是将一个Callable传递到一个执行器中。

执行器

执行器有许多静态工厂方法,下面的表格对其中的方法作了一个汇总。

|方法 | 描述 |
|newCachedThreadPool|必要时创建新线程;空闲线程会保留60秒|
|newFixedThreadPool | 池中包含固定数目的线程;空闲线程会一直保留|
| newWorkStealingPool | 一种适合“fork-join”任务的线程池,可以将复杂的任务分解成简单任务,空闲线程会“密取”简单任务 |
|newSingleThreadExecutor|只有一个线程的池,会顺序执行所提交的任务|
| newSingleThreadSchedExcutor | 用于调度执行固定线程池 |

控制任务组

为了执行器更有策略性,需要控制一组任务。
invokeAny
该方法提交一个Callable 对象集合中的所有对象,并返回某个已完成任务的结果。我们不知道返回的是哪个任务的结果,往往是最快完成的那个任务,对于搜索任务我们更愿意接受任何一种答案,就能使用这个方法。例如,假设需要对一个大整数进行因数分解,这是RSA解码时需要完成的一种计算。可以提交很多任务,每个任务尝试对不同范围的数进行分解。只要其中一个任务得到了答案,计算立止。

invokeAll

该方法提交一个Callable对象集合中所有对象,这个方法会阻塞,直到所有任务都完成,并返回所有任务答案的一个Future对象列表。得到计算结果后,还可以像下面这样的结果进行处理:

List<Callable<T>>tasks = ;
List<Future<T>> results = executor.invokeAll(tasks);
for(Future<T> result: results){
	processFuture(result.get());
}

在for循环中第一个result.get()会堵塞,直到第一个结果可用。

下面的代码展示了如何用Callable和执行器。在第一个计算中,统计一会目录树中包含一个给定单词的文件的个数。为每个文件建立一个单独的任务:

package executors;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ExecutorDemo {
	/*
	 * counts occurrences of a given word in a life
	 * */
	public static long occurrences(String word, Path path) {
		try(Scanner in = new Scanner(path)){
			int count = 0;
			while(in.hasNext()) {
				if(in.next().equals(word)) count++;
			}
			return count;
		}catch(IOException ex) {
			return 0;
		}
	}
	
	/*
	 * returns all descendants of a given directory
	 * 
	 * */
	public static Set<Path> descendants(Path rootDir) throws IOException{
		try(Stream<Path> entries =Files.walk(rootDir)){
			return (Set<Path>) entries.filter(Files::isRegularFile).collect(Collectors.toSet());
		}
	}
	/*
	 * Yields a task that searches for a word in a file.
	 * */
	
	public static Callable<Path> searchForTask(String word, Path path){
		return ()->{
			try(Scanner in = new Scanner(path)){
				while(in.hasNext()) {
					if(in.next().equals(word)) return path;
					if(Thread.currentThread().isInterrupted()) {
						System.out.println("Search in "+ path+"canceled");
						return null;
					}
				}
				throw new NoSuchElementException();
			}
		};
	}
	public static void main(String[] args) throws InterruptedException, ExecutionException, IOException{
		try(Scanner in = new Scanner(System.in)){
			System.out.print("Enter base directory (e. g. /opt/jdk-9-src):");
			String start = in.nextLine();
			System.out.print("Enter keyword (e.g. volatile):");
			String word = in.nextLine();
			
			Set<Path> files = descendants(Paths.get(start));
			ArrayList<Callable<Long>> tasks = new ArrayList<Callable<Long>>();
			for(Path file : files ) {
				Callable<Long> task = () -> occurrences(word, file);
				tasks.add(task);
			}
			ExecutorService executor = Executors.newCachedThreadPool();
			
			
			Instant startTime = Instant.now();
			List<Future<Long>> results = executor.invokeAll(tasks);
			long total =0;
			for(Future<Long> result: results) {
				total+=result.get();
			}
			Instant endTime = Instant.now();
			System.out.println("Occurrences of "+word+":"+total);
			System.out.println("Time elapsed:" + Duration.between(startTime, endTime).toMillis()+"ms");
			ArrayList<Callable<Path>> searchTasks = new ArrayList<Callable<Path>>();
			for(Path file: files) {
				searchTasks.add(searchForTask(word, file));
				
			}
			Path found = executor.invokeAny(searchTasks);
			System.out.println(word+"occurs in" + found);
			if(executor instanceof ThreadPoolExecutor) {//the sinigle thread executor isn't
				System.out.println("Largest pool size:"+((ThreadPoolExecutor) executor).getLargestPoolSize());
			}
			executor.shutdown();
			
		}

	}

}

举报

相关推荐

0 条评论