一、简介
在多核处理器普及的今天,充分利用 CPU 的并行计算能力已成为高性能 Java 应用开发的关键。传统的线程池和 ExecutorService
虽然能够满足大部分并发需求,但在面对“分治类”任务(如排序、搜索、图像处理等)时效率并不高。
二、什么是 Fork/Join 框架?
2.1 概念定义
Fork/Join 是一种基于 工作窃取算法(Work-Stealing Algorithm) 的并发执行框架。
它通过将一个大任务 Fork(拆分) 成多个子任务,并行执行;最后再将结果 Join(合并) 起来,从而实现高效并行计算。
2.2 核心组件
组件 | 说明 |
| 线程池,负责管理所有 Fork/Join 任务 |
| 所有任务的抽象基类,常用子类有 |
| 有返回值的递归任务 |
| 无返回值的递归任务 |
三、Fork/Join 的工作原理
3.1 工作流程图示
Main Task
│
▼
fork() → 子任务1、子任务2 ...
│
▼
join() ← 合并子任务结果
3.2 工作窃取机制
每个线程维护自己的任务队列(双端队列),当某个线程完成自己的任务后,会从其他线程队列尾部“偷取”任务来继续执行,从而提高整体吞吐量。
四、快速上手:第一个 Fork/Join 示例
4.1 需求:并行计算数组和
public class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 100;
private int[] array;
private int start, end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 异步执行左子任务
right.fork(); // 异步执行右子任务
return left.join() + right.join(); // 合并结果
}
}
}
4.2 主程序调用
public static void main(String[] args) {
int[] array = new int[10000];
Arrays.fill(array, 1); // 全部填充为 1
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
int result = pool.invoke(task);
System.out.println("总和:" + result); // 输出 10000
}
五、Fork/Join 适用场景
场景 | 是否适合 Fork/Join |
大数据量计算(如排序、查找) | ✅ 非常适合 |
图像处理、矩阵运算 | ✅ 适合 |
网络请求、IO 操作 | ❌ 不适合(阻塞型任务) |
任务之间强依赖 | ❌ 不适合 |
递归/分治类任务 | ✅ 高度匹配 |
六、进阶实战:并行排序(Merge Sort)
使用 Fork/Join 来实现一个高效的并行归并排序算法:
public class MergeSortTask extends RecursiveAction {
private int[] array;
private int left, right;
public MergeSortTask(int[] array, int left, int right) {
this.array = array;
this.left = left;
this.right = right;
}
@Override
protected void compute() {
if (right - left <= 1) return;
int mid = (left + right) / 2;
MergeSortTask leftTask = new MergeSortTask(array, left, mid);
MergeSortTask rightTask = new MergeSortTask(array, mid, right);
leftTask.fork();
rightTask.fork();
leftTask.join();
rightTask.join();
merge(array, left, mid, right);
}
private void merge(int[] arr, int left, int mid, int right) {
int[] temp = new int[right - left];
int i = left, j = mid, k = 0;
while (i < mid && j < right)
temp[k++] = arr[i] <= arr[j] ? arr[i++] : arr[j++];
while (i < mid) temp[k++] = arr[i++];
while (j < right) temp[k++] = arr[j++];
System.arraycopy(temp, 0, arr, left, temp.length);
}
}