9.2 ForkJoinPool

Java 7 引入了一个新的线程池:ForkJoinPool 类。这个类看上去和其他任何线程池都很像;和 ThreadPoolExecutor 类一样,它也实现了 ExecutorExecutorService 接口。在支持这些接口方面,ForkJoinPool 在内部会使用一个无界任务列表,供构造器中所指定数目(如果所选的是无参构造器,则为该机器上的 CPU 数)的线程来运行。

ForkJoinPool 类是为配合分治算法的使用而设计的:任务可以递归地分解为子集。这些子集可以并行处理,然后每个子集的结果被归并到一个结果中。一个经典的例子就是快速排序算法。

分治算法的重点是,算法会创建大量的任务,而这些任务只有相对较少的几个线程来管理。比如要排序一个包含 1000 万个元素的数组。首先创建单独的任务来执行 3 个操作:排序包含前面 500 万个元素的子数组,再排序包含后面 500 万个元素的子数组,然后合并两个子数组。

类似地,要排序包含 500 万个元素的数组,可以分别排序包含 250 万个元素的子数组,然后合并子数组。一直递归到某个点(比如到子数组包含 10 个元素时),这时在子数组上使用插入排序直接处理更为高效。图 9-1 演示了其工作方式。

9.2 ForkJoinPool - 图1

图 9-1:递归快速排序中的任务

最后会有超过 100 万个任务来排序叶子数组(每个数组少于 10 个元素,这时候直接排序即可;这里只是用 10 来举例,实际值会随实现的不同而有所变化。在目前的 Java 库实现中,当数组少于 47 个元素时 1,会采用插入排序)。需要 50 多万个任务来归并那些排好序的数组,归并下一级又需要 25 万个任务,依此类推。最后会有 2 097 151 个任务。

1可以参考 java.util.DualPivotQuicksort 的实现。——译者注

更大的问题是,所有任务都要等待它们派生出的任务先完成,然后才能完成。对于元素数少于 10 的子数组,直接对它们做排序的任务必须优先完成;在此之后,创建相应子数组的任务才能归并其子数组的结果,依此类推:链条上的所有任务依次归并,直到整个数组被归并为最终的、排序好的结果。

因为父任务必须等待子任务完成,所以无法使用 ThreadPoolExecutor 高效实现这个算法。ThreadPoolExecutor 内的线程无法将另一个任务添加到队列中并等待其完成:一旦线程进入等待状态,就无法使用该线程执行它的某个子任务了。另一方面,ForkJoinPool 则允许其中的线程创建新任务,之后挂起当前的任务。当任务被挂起时,线程可以执行其他等待的任务。

举个简单的例子:比如说有个 double 数组,我们想计算数组中小于 0.5 的元素的个数。顺序扫描比较简单(可能还有优势,本节后面会看到),但是为了说明问题,现在把数组划分为子数组,并行扫描(模仿更复杂的快速排序和其他分治算法)。使用 ForkJoinPool 实现这一功能的代码如下:

  1. public class ForkJoinTest {
  2. private double[] d;
  3. private class ForkJoinTask extends RecursiveTask<Integer> {
  4. private int first;
  5. private int last;
  6. public ForkJoinTask(int first, int last) {
  7. this.first = first;
  8. this.last = last;
  9. }
  10. protected Integer compute() {
  11. int subCount;
  12. if (last - first < 10) {
  13. subCount = 0;
  14. for (int i = first; i <= last; i++) {
  15. if (d[i] < 0.5)
  16. subCount++;
  17. }
  18. }
  19. else {
  20. int mid = (first + last) >>> 1;
  21. ForkJoinTask left = new ForkJoinTask(first, mid);
  22. left.fork();
  23. ForkJoinTask right = new ForkJoinTask(mid + 1, last);
  24. right.fork();
  25. subCount = left.join();
  26. subCount += right.join();
  27. }
  28. return subCount;
  29. }
  30. }
  31. public static void main(String[] args) {
  32. d = createArrayOfRandomDoubles();
  33. int n = new ForkJoinPool().invoke(new ForkJoinTask(0, 9999999));
  34. System.out.println("Found " + n + " values");
  35. }
  36. }

fork()join() 方法是这里的关键:没有这些方法,实现这类递归会非常痛苦(在由 ThreadPoolExecutor 执行的任务中就没有这些方法)。这些方法使用了一系列内部的、从属于每个线程的队列来操纵任务,并将线程从执行一个任务切换到执行另一个。细节对开发者是透明的,不过如果对算法感兴趣,其代码读起来也很有意思。这里我们重点关注的是性能:ForkJoinPoolThreadPoolExecutor 这两个类之间有什么权衡取舍呢?

首先,fork/join 范型所实现的挂起,使得所有任务可以交由少量的线程执行。使用该示例代码计算包含 1000 万个元素的数组中的 double 值,会创建 200 多万个任务,但这些任务很容易交由少量一些线程执行(甚至是一个线程,如果这对运行测试的机器有意义的话)。使用 ThreadPoolExecutor 运行类似算法则需要 200 多万个线程,因为每个线程必须等待其子任务完成,而且那些子任务只有在池中有可用线程时才能完成。有了 fork/join,我们可以实现用 ThreadPoolExecutor 无法实现的算法,这就是一个性能优势。

尽管分治技术非常强大,但是滥用也可能会导致性能变糟糕。在计数的这个例子中,可以使用一个线程来扫描数组并计数,虽然未必能像并行运行 fork/join 算法那样快。然而,把原数组划分为多个断,使用 ThreadPoolExecutor 让多个线程扫描数组,也是非常容易的:

  1. public class ThreadPoolTest {
  2. private double[] d;
  3. private class ThreadPoolExecutorTask implements Callable<Integer> {
  4. private int first;
  5. private int last;
  6. public ThreadPoolExecutorTask(int first, int last) {
  7. this.first = first;
  8. this.last = last;
  9. }
  10. public Integer call() {
  11. int subCount = 0;
  12. for (int i = first; i <= last; i++) {
  13. if (d[i] < 0.5) {
  14. subCount++;
  15. }
  16. }
  17. return subCount;
  18. }
  19. }
  20. public static void main(String[] args) {
  21. d = createArrayOfRandomDoubles();
  22. ThreadPoolExecutor tpe = new ThreadPoolExecutor(4, 4,
  23. Long.MAX_VALUE,
  24. TimeUnit.SECONDS,
  25. new LinkedBlockingQueue());
  26. Future[] f = new Future[4];
  27. int size = d.length / 4;
  28. for (int i = 0; i < 3; i++) {
  29. f[i] = tpe.submit(
  30. new ThreadPoolExecutorTask(i * size, (i + 1) * size - 1);
  31. }
  32. f[3] = tpe.submit(new ThreadPoolExecutorTask(3 * size, d.length - 1);
  33. int n = 0;
  34. for (int i = 0; i < 4; i++) {
  35. n += f.get();
  36. }
  37. System.out.println("Found " + n + " values");
  38. }
  39. }

在一个配备了 4 个 CPU 的机器上,这段代码可以充分利用所有可用的 CPU,并行处理数组,同时避免像 fork/join 示例中那样创建和排队处理 200 万个任务。可以预见性能会快些,如表 9-4 所示。

表9-4:对1亿个元素做计数处理

线程数 ForkJoinPool(秒) ThreadPoolExecutor(秒)
1 3.2 0.31
4 1.9 0.15

测试所用的机器有 4 个 CPU,4 GB 固定内存。测试中,ThreadPoolExecutor 完全不需要 GC,而每个 ForkJoinPool 测试会花 1.2 秒在 GC 上。对于性能差异而言,这一点所占比重很大,但这并非故事的全部:创建和管理任务对象的开销也会伤害 ForkJoinPool 的性能。如果有类似的替代方案,很可能会更快,至少在这个简单的例子中是这样。

ForkJoinPool 还有一个额外的特性,它实现了工作窃取(work-stealing)。这基本上就是一个实现细节了;这意味着池中的每个线程都有自己所创建任务的队列。线程会优先处理自己队列中的任务,但如果这个队列已空,它会从其他线程的队列中窃取任务。其结果是,即使 200 万个任务中有一个需要很长的执行时间,ForkJoinPool 中的其他线程也可以完成其余的随便什么任务。ThreadPoolExecutor 则不会这样:如果一个任务需要很长的时间,其他线程并不能处理额外的工作。

示例代码先是计算数组中小于 0.5 的元素数。此外,如果代码中还计算了一个新的值,并保存到数组中了,会发生什么?一个没有实际意义但却是 CPU 密集型的实现可以执行以下代码:

  1. for (int i = first; i <= last; i++) {
  2. if (d[i] < 0.5) {
  3. subCount++;
  4. }
  5. for (int j = 0; j < d.length - i; j++) {
  6. for (int k = 0; k < 100; k++) {
  7. dummy = j * k + i; // dummy是一个volatile变量,因此发生了多次写操作
  8. d[i] = dummy;
  9. }
  10. }
  11. }

因为用 j 索引的外部循环是基于元素在数组中的位置处理的,所以计算所需要的时间和元素位置成比例关系:计算 d[0] 的值需要很长的时间,而计算 d[d.length - 1] 则只需要很短的时间。

简单地将数组分为 4 段,用 ThreadPoolExecutor 处理,这个测试有一个不好的地方。计算数组第 1 段的线程需要很长的时间才能完成,比处理数组最后一段的第 4 个线程所需的时间长得多。一旦第 4 个线程结束,它就会处于空闲状态:所有线程都要等第 1 个线程完成它的耗时较长的任务。

在粒度为 200 万个任务的 ForkJoinPool 中,尽管有一个线程会忙于针对数组中的前 10 个元素的非常耗时的计算,但是其余线程都有工作可做,在大部分测试过程中,CPU 会保持忙碌。区别如表 9-5 所示。

表9-5:处理包含10 000个元素的数组的时间

线程数 ForkJoinPool(秒) ThreadPoolExecutor(秒)
1 54.5 53.3
4 16.6 24.2

当池中只有一个线程时,计算所花的时间基本一样。这可以理解:不管池如何实现,计算量是一样的;而且因为那些计算绝对不会并行进行,所以可以预计它们所需的时间是一样的(尽管创建 200 万个任务会有少量开销)。但是当池中包含 4 个线程时,ForkJoinPool 中任务的粒度会带来一个决定性的优势:几乎在测试的整个过程中,都能保持 CPU 的忙碌状态。

这种情况就叫作“不均衡”,因为某些任务所花的时间比其他任务长(因此前面例子中的任务可以说是“均衡的”)。一般而言,如果任务是均衡的,使用分段的 ThreadPoolExecutor 性能更好;而如果任务是不均衡的,则使用 ForkJoinPool 性能更好。

还有一个更微妙的性能方面的建议:请仔细考虑 fork/join 范型应该在哪个点结束递归。在这个例子中,我信手选择了当数组大小小于 10 时结束。如果在数组大小为 250 万时停止递归,那么 fork/join 测试(在搭载 4 个 CPU 的机器上,处理 1000 万个元素的平衡代码)会只创建 4 个任务,其性能基本和 ThreadPoolExecutor 一样。

另一方面,对于这个例子,在非平衡的测试中,继续递归会有更好的性能,即使创建更多任务。表 9-6 给出了一些有代表性的数据点。

表9-6:处理包含10 000个元素的数组的时间

叶子数组大小 ForkJoinPool(秒)
20 17.8
10 16.6
5 15.6
1 16.8

自动并行化

Java 8 向 Java 中引入了自动并行化特定种类代码的能力。这种并行化就依赖于 ForkJoinPool 类的使用。Java 8 为这个类加入了一个新特性:一个公共的池,可供任何没有显式指定给某个特定池的 ForkJoinTask 使用。这个公共池是 ForkJoinPool 类的一个 static 元素,其大小默认设置为目标机器上的处理器数。

这种并行化在 Arrays 类的很多新方法中都会发生,包括使用并行快速排序处理数组的方法,操作数组的每个元素的方法,等等。在 Java 8 的 Stream 特性中也有应用,支持在集合中的每个元素上(或顺序或并行地)执行操作。Stream 的一些基本的性能特性会在第 12 章讨论;在本节中,我们看一下 Stream 是如何自动地并行处理的。

给定一个包含一系列整型数的集合,下列代码会计算与给定整型数匹配的股票代号的价格历史:

  1. Stream<Integer> stream = arrayList.parallelStream();
  2. stream.forEach(a -> {
  3. String symbol = StockPriceUtils.makeSymbol(a);
  4. StockPriceHistory sph = new StockPriceHistoryImpl(symbol, startDate,
  5. endDate, entityManager);
  6. });

这段代码会并行计算模拟价格历史:forEach() 方法将为数组列表中的每个元素创建一个任务,每个任务都会由公共的 ForkJoinTask 池处理。它在功能上与本章开始所做的测试是等价的,那个测试是用一个线程池来并行计算价格历史(不过与显式使用线程池相比,这段代码写起来更容易)。

设置 ForkJoinTask 池的大小和设置其他任何线程池同样重要。默认情况下,公共池的线程数等于机器上的 CPU 数。如果在同一机器上运行着多个 JVM,则应限制这个线程数,以防这些 JVM 彼此争用 CPU。类似地,如果 Servlet 代码会执行某个并行任务,而我们想确保 CPU 可供其他任务使用,可以考虑减小公共池的线程数。另外,如果公共池中的任务会阻塞等待 I/O 或其他数据,也可以考虑增大线程数。

这个值可以通过设置系统属性 -Djava.util.concurrent.ForkJoinPool.common.parallelism=N 来指定。

在本章前面的表 9-1 中,曾经对比过线程数对并行计算股票历史价格的影响。表 9-7 使用共同的 ForkJoinPool(将 parallelism 系统属性设置为给定的值)将那个数据与 forEach() 构造作了比较。

表9-7:计算10 000支模拟股票价格历史所需的时间

线程数 ThreadPoolExecutor(秒) ForkJoinPool(秒)
1 255.6 135.4
2 134.8 110.2
4 77.0 96.5
8 81.7 84.0
15 85.6 84.6

默认情况下,公共池有 4 个线程(在这个配置了 4 个 CPU 的机器上),所以表中的第 3 行为一般情况。在线程数为 1 和 2 时,这类结果会让性能工程师很不开心:它们看上去很不协调,而当某一项测试出现这样的情况时,最常见的原因是测试错误。这里的原因是 forEach() 方法有些奇怪的行为:它使用了一个线程执行语句,还使用了公共池中的线程处理来自 Stream 的数据。即使在第 1 个测试中,公共池也是配置为使用一个线程,总的还是会使用两个线程来计算结果。(因此,使用了 2 个线程的 ThreadPoolExecutor 和使用了 1 个线程的 ForkJoinPool 的耗时基本相同。)

在使用并行 Stream 构造或其他自动并行化特性时,如果需要调整公共池的大小,可以考虑将所需的值减 1。

9.2 ForkJoinPool - 图2 快速小结

1. ForkJoinPool 类应该用于递归、分治算法。

2. 应该花些心思来确定,算法中的递归任务何时结束最为合适。创建太多任务会降低性能,但如果任务太少,而任务所需的执行时间又长短不一,也会降低性能。

3. Java 8 中使用了自动并行化的特性会用到一个公共的 ForkJoinPool 实例。我们可能需要根据实际情况调整这个实例的默认大小。