Java ForkJoin框架的原理及用法

Java Fork/Join 框架是在 JDK7 中引入的,在 java.util.concurrent 包中,它提供了一种并行执行任务的方式,能够将一个大任务拆分成多个小任务进行处理,其中包括我们熟知的 MapReduce。

Java Fork/Join 框架

什么是 Java Fork/Join 框架

Java Fork/Join 框架是在 JDK7 中引入的,在 java.util.concurrent 包中,它提供了一种并行执行任务的方式,能够将一个大任务拆分成多个小任务进行处理,其中包括我们熟知的 MapReduce。

Fork/Join 的原理

Java Fork/Join 框架的原理是“工作窃取”,分为两种工作线程:Worker 和 Steal Worker

  • Worker:执行工作的线程。
  • Steal Worker:空闲线程,从其他 Worker 线程中窃取任务执行。

具体地说,当一个 Worker 完成任务后,它会去队列中找新的任务,如果队列为空,它会从其他 Worker 的任务队列的末尾窃取任务执行,这种方式将不断重复,直到所有任务都被处理完成。

这种工作窃取的方式可以减少线程之间的竞争和等待时间,提高并行处理的效率。

Fork/Join 的用法

Java Fork/Join 框架的主要类是 ForkJoinPool 和 ForkJoinTask,其中 ForkJoinPool 是线程池,ForkJoinTask 是任务。

您可以使用 ForkJoinPool 来管理 ForkJoinTask,完成并行任务处理。

创建 ForkJoinPool 对象

当您需要创建一个 ForkJoinPool 对象时,需要指定一个并行度参数,这个参数表示 ForkJoinPool 中线程的数量。一般情况下,可以通过 Runtime.getRuntime().availableProcessors() 方法获取当前机器的处理器数量,然后适当调整并行度参数。

ForkJoinPool forkJoinPool = new ForkJoinPool(parallelism);

创建 ForkJoinTask 对象

ForkJoinTask 是抽象类,不能直接使用。您需要通过创建 ForkJoinTask 的子类来执行任务。您可以选择两种类型的子类:RecursiveAction 和 RecursiveTask。

  • RecursiveAction:适用于没有返回值的任务。
  • RecursiveTask:适用于有返回值的任务。

举例说明:

class MyRecursiveTask extends RecursiveTask<Integer> {

    private int[] array;
    private int start;
    private int end;

    MyRecursiveTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start < 100) { // 如果任务量很小,就直接计算结果
            int result = 0;
            for (int i = start; i < end; i++) {
                result += array[i];
            }
            return result;
        } else {
            int mid = (start + end) >>> 1;
            MyRecursiveTask leftTask = new MyRecursiveTask(array, start, mid);
            MyRecursiveTask rightTask = new MyRecursiveTask(array, mid, end);
            invokeAll(leftTask, rightTask); // 将任务交给 ForkJoinPool 管理
            return leftTask.join() + rightTask.join(); // 合并计算结果
        }
    }
}

在上述示例中,我们创建了 MyRecursiveTask 类,继承自 RecursiveTask 类,并重写了 compute() 方法实现了分治计算。

提交任务到 ForkJoinPool 对象中

当您创建了 ForkJoinTask 的对象后,您就可以将它提交给 ForkJoinPool,自动进行并行计算。这时,您需要使用 ForkJoinPool 的 submit() 方法,将任务封装成 ForkJoinTask 的形式进行提交。如果这个任务是 root 任务,那么您可以通过 fork() 方法将任务分割成尽可能小的任务进行计算。

示例:

ForkJoinPool forkJoinPool = new ForkJoinPool();

int[] array = new int[100000];
for (int i = 0; i < array.length; i++) {
    array[i] = i;
}

MyRecursiveTask task = new MyRecursiveTask(array, 0, array.length);

int sum = forkJoinPool.invoke(task);

System.out.println("result: " + sum);

在上述示例中,我们创建了一个大小为 100000 的数组,并将它传给 MyRecursiveTask 类。调用 ForkJoinPool 的 invoke() 方法,提交任务并获取任务的返回值。

使用场景

Java Fork/Join 框架适用于大规模并行处理任务的场景,比如搜索相似度、矩阵计算、排序等。

同时,您需要注意,这个框架的并行度很高,会以尽可能多的线程来让您的任务并发执行,在小任务和非CPU密集型的场景中,并不会有比 ThreadPoolExecutor 更好的表现。

本文标题为:Java ForkJoin框架的原理及用法