0
点赞
收藏
分享

微信扫一扫

Fork/Join 框架解析

一、简介

在多核处理器普及的今天,充分利用 CPU 的并行计算能力已成为高性能 Java 应用开发的关键。传统的线程池和 ExecutorService 虽然能够满足大部分并发需求,但在面对“分治类”任务(如排序、搜索、图像处理等)时效率并不高。

二、什么是 Fork/Join 框架?

2.1 概念定义

Fork/Join 是一种基于 工作窃取算法(Work-Stealing Algorithm) 的并发执行框架。

它通过将一个大任务 Fork(拆分) 成多个子任务,并行执行;最后再将结果 Join(合并) 起来,从而实现高效并行计算。

2.2 核心组件

组件

说明

ForkJoinPool

线程池,负责管理所有 Fork/Join 任务

ForkJoinTask

所有任务的抽象基类,常用子类有 RecursiveTaskRecursiveAction

RecursiveTask<V>

有返回值的递归任务

RecursiveAction

无返回值的递归任务

三、Fork/Join 的工作原理

3.1 工作流程图示

Main Task
   │
   ▼
fork() → 子任务1、子任务2 ...
   │
   ▼
join() ← 合并子任务结果

3.2 工作窃取机制

每个线程维护自己的任务队列(双端队列),当某个线程完成自己的任务后,会从其他线程队列尾部“偷取”任务来继续执行,从而提高整体吞吐量。

四、快速上手:第一个 Fork/Join 示例

4.1 需求:并行计算数组和

public class SumTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 100;
    private int[] array;
    private int start, end;

    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            int sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            int mid = (start + end) / 2;
            SumTask left = new SumTask(array, start, mid);
            SumTask right = new SumTask(array, mid, end);

            left.fork();  // 异步执行左子任务
            right.fork(); // 异步执行右子任务

            return left.join() + right.join(); // 合并结果
        }
    }
}

4.2 主程序调用

public static void main(String[] args) {
    int[] array = new int[10000];
    Arrays.fill(array, 1); // 全部填充为 1

    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(array, 0, array.length);
    int result = pool.invoke(task);

    System.out.println("总和:" + result); // 输出 10000
}

五、Fork/Join 适用场景

场景

是否适合 Fork/Join

大数据量计算(如排序、查找)

✅ 非常适合

图像处理、矩阵运算

✅ 适合

网络请求、IO 操作

❌ 不适合(阻塞型任务)

任务之间强依赖

❌ 不适合

递归/分治类任务

✅ 高度匹配

六、进阶实战:并行排序(Merge Sort)

使用 Fork/Join 来实现一个高效的并行归并排序算法:

public class MergeSortTask extends RecursiveAction {
    private int[] array;
    private int left, right;

    public MergeSortTask(int[] array, int left, int right) {
        this.array = array;
        this.left = left;
        this.right = right;
    }

    @Override
    protected void compute() {
        if (right - left <= 1) return;

        int mid = (left + right) / 2;
        MergeSortTask leftTask = new MergeSortTask(array, left, mid);
        MergeSortTask rightTask = new MergeSortTask(array, mid, right);

        leftTask.fork();
        rightTask.fork();

        leftTask.join();
        rightTask.join();

        merge(array, left, mid, right);
    }

    private void merge(int[] arr, int left, int mid, int right) {
        int[] temp = new int[right - left];
        int i = left, j = mid, k = 0;

        while (i < mid && j < right)
            temp[k++] = arr[i] <= arr[j] ? arr[i++] : arr[j++];

        while (i < mid) temp[k++] = arr[i++];
        while (j < right) temp[k++] = arr[j++];

        
        System.arraycopy(temp, 0, arr, left, temp.length);
    }
}


举报

相关推荐

0 条评论