线程安全的集合
当多个线程并发修改同同一个数据结构时(例如hash表)容易破坏这个数据结构。可以通过加锁来保护这些数据结构,或者使用java提供的一些线程安全的集合。
堵塞队列(BlockingQueue)
很多线程问题可以使用一个或多个队列来解决,当然使用的线程安全队列必须考虑锁和条件。
当向已满的队列添加元素时,或是向空的队列取出元素时堵塞队列会导致线程堵塞。
若工作线程运行的比其他线程快,则队列会被填满,知道其他线程赶上来,如果工作线程比其他线程慢,则其他线程等待结果时线程会堵塞。
堵塞队列方法
当向空队列中取出元素或是向满队列中添加元素时remove、add、element方法会抛出异常,所以应答使用,offer、poll、和peek方法作为代替(不抛出异常而是给出一个错误提示)。
还有带时间参数的offer和poll方法
boolean success = queue.offer(x,100,TimeUnit.MILLISECONDS);
// 在100毫秒时间内在队尾插入一个元素成功返回true失败返回false
堵塞队列变体
java.util.concurrent包提供了堵塞队列的几个变体
- LinkedBlockingQueue 容器没有上界,也可以选择指定一个最大容量
- LinkedBlockingDeque 和上面一样,但是时双端的
- ArrayBlockingQueue 在构造时需要指定参数,并且有一个可选参数来指定公平性(公平性会降低性能,非必要不使用)
- PriorityBlockingQueue 是一个优先队列,按照优先级顺序移除容器没有上限,若队列为空移除时会堵塞
- DelayQueue 只有延迟结束才能从队列中移除,延迟队列负值表示延迟结束,
如何使用一个堵塞队列来管理控制一组线程
package threads;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class BlockingQueueTest {
private static final int FILE_QUEUE_SIZE = 10;//指定队列容量
private static final int SEARCH_THREADS = 100;
private static final Path DUMMY = Path.of("");
private static BlockingQueue<Path> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);
public static void main(String[] args) {
try (Scanner input = new Scanner(System.in)){
System.out.println("ENTER base directory:");
String directory = input.nextLine();
System.out.print("ENTER keyword:");
String keyword = input.nextLine();
Runnable enumerator = ()->{
try {
enumerates(Path.of(directory));
}catch(Exception e){
e.getStackTrace();
}
};
new Thread(enumerator).start();
for (int i = 0; i < SEARCH_THREADS; i++) {
Runnable searcher = ()->{
try {
boolean done = false;
while (!done){
Path file = queue.take();
if (file == DUMMY){
queue.put(file);
done = true;
}
else seach(file, keyword);
}
}catch(Exception e){
e.getStackTrace();
}
};
new Thread(searcher).start();
}
}
}
public static void enumerates(Path directory) throws IOException,InterruptedException {
try(Stream<Path> children = Files.list(directory)){
for(Path child : children.collect(Collectors.toList())){
if(Files.isDirectory(child)){
enumerates(child);
}
else{
queue.put(child);
}
}
}
}
public static void seach(Path file,String keyword) throws IOException{
try(Scanner input = new Scanner(file, StandardCharsets.UTF_8)){
int lineNumber = 0;
while (input.hasNextLine()){
lineNumber++;
String line = input.nextLine();
if (line.contains(keyword))
System.out.println(file+" : "+lineNumber+" : "+line);
}
}
}
}
运行结果
生产者线程美剧目录下所有文件放入堵塞队列中,同时一堆启动消费者队列取出文件读取文件并取出关键字所在的那一行并进行打印
注意:这里不需要显示同步,使用数据结构作为一种同步机制