7.1 并行流
第4章简要提到过使用Stream接口能非常方便地并行处理其元素:对收集源调用parallelStream方法就能将集合转换为并行流。并行流就是一个把内容拆分成多个数据块,用不同线程分别处理每个数据块的流。这样一来,你就可以自动地把工作负荷分配到多核处理器的所有核,让它们都忙起来。我们用一个简单的例子来验证一下这个思想。
假设你需要写一个方法,接受数字
作为参数,并返回从1到给定参数的所有数字的和。一个直接(也许有点土)的方法是生成由一个数字组成的无限流,将它限制到传入的数目,然后使用对两个数字求和的BinaryOperator来归约这个流,代码如下所示:
public long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1) ←---- 生成自然数无限流.limit(n) ←---- 限制到前n个数.reduce(0L, Long::sum); ←---- 对所有数字求和来归约流}
如果采用更为传统的Java术语,上述代码与下面这种迭代方式其实是等价的:
public long iterativeSum(long n) {long result = 0;for (long i = 1L; i <= n; i++) {result += i;}return result;}
这似乎是利用并行处理的好机会,特别是
很大的时候。那怎么入手呢?你要对结果变量进行同步吗?用多少个线程呢?谁负责生成数呢?谁来做加法呢?
根本用不着担心。用并行流的话,这问题就简单多了!
7.1.1 将顺序流转换为并行流
对顺序流调用parallel方法,你可以将流转换成并行流,让前面的函数式归约过程(也就是求和)并行执行:
public long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel() ←---- 将流转换为并行流.reduce(0L, Long::sum);}
在上面的代码中,对流中所有数字求和的归约过程,其执行方式与5.4.1节介绍的大同小异,不同之处在于现在Stream由内部被分成了几块。因此能对不同的块执行独立并行的归约操作,如图7-1所示。最后,各个子流部分归约的返回值会被同一个归约操作整合,得到整个原始流的归约结果。

图 7-1 并行归约操作
请注意,实际上,对顺序流调用parallel方法并不意味着流本身有任何实际的变化。它其实仅仅在内部设置了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。类似地,你只需要对并行流调用sequential方法就可以把它变成顺序流。请注意,你可能以为把这两个方法结合起来,就可以更精细地控制遍历流时哪些操作要并行执行,哪些要顺序执行。例如,你可以这样做:
stream.parallel().filter(...).sequential().map(...).parallel().reduce();
但最后一次parallel或sequential调用会影响整个流水线。在本例中,流水线会并行执行,因为最后调用的是它。
配置并行流使用的线程池
看看流的
parallel方法,你可能会想,并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?并行流内部使用了默认的
ForkJoinPool(7.2节会进一步讲到分支/合并框架),它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。但是这并非一成不变,你可以通过系统属性
java.util.concurrent.ForkJoinPool.common.parallelism来修改线程池大小,如下所示:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");这是一个全局设置,因此它会对代码中所有的并行流产生影响。反过来说,目前我们还无法专为某个并行流指定这个值。一般而言,让
ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很充足的理由,否则强烈建议你不要修改它。
回到数字求和练习的例子,我们说过,在多核处理器上运行并行版本时,会有显著的性能提升。现在你有三个方法,用三种不同的方式(迭代式、顺序归约和并行归约)做完全相同的操作,让我们看看谁最快吧!
7.1.2 测量流性能
我们说并行求和法比顺序、迭代法性能好,然而并没有给出实锤的依据。软件工程上靠猜绝对不是什么好主意!优化性能时,你应该始终遵循的黄金法则是:测量,测量,再测量。基于这个思想,我们使用名为Java微基准套件(Java microbenchmark harness,JMH)的库实现了一个微基准测试。JMH是一个以声明方式帮助大家创建简单、可靠微基准测试的工具集,它支持Java,也支持可以运行在Java虚拟机(Java virtual machine,JVM)上的其他语言。事实上,为运行于JVM上的程序创建正确且有价值的基准测试并不是件容易事儿,因为你需要考虑大量可能影响性能的因素,譬如HotSpot虚拟机的热身时间。恰当的热身时间可以提升虚拟机对字节码的优化,减小垃圾收集的开销。如果你使用Maven作为编译工具,那么启动JMH只需要在你项目的pom.xml文件(该文件定义了Maven的构建过程)中添加几行依赖,如下所示:
<dependency><groupId>org.openjdk.jmh</groupId><artifactId>jmh-core</artifactId><version>1.17.4</version></dependency><dependency><groupId>org.openjdk.jmh</groupId><artifactId>jmh-generator-annprocess</artifactId><version>1.17.4</version></dependency>
上述第一个库是JMH的核心实现,第二个库包含了帮助产生Java归档(JAR)文件的注解处理器,一旦你在Maven配置文件中添加了下面的配置,就可以通过它非常方便地执行微基准测试了:
<build><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>benchmarks</finalName><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.openjdk.jmh.Main</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
做完这一步,你就能很轻松地对本节开头介绍的sequentialSum方法执行基准测试了,如下所示。
代码清单 7-1 测量对前
个自然数求和的函数的性能
@BenchmarkMode(Mode.AverageTime) ←---- 测量用于执行基准测试目标方法所花费的平均时间@OutputTimeUnit(TimeUnit.MILLISECONDS) ←---- 以毫秒为单位,打印输出基准测试的结果@Fork(2, jvmArgs={"-Xms4G", "-Xmx4G"}) ←---- 采用4Gb的堆,执行基准测试两次以获得更可靠的结果public class ParallelStreamBenchmark {private static final long N= 10_000_000L;@Benchmark ←---- 基准测试的目标方法public long sequentialSum() {return Stream.iterate(1L, i -> i + 1).limit(N).reduce( 0L, Long::sum);}@TearDown(Level.Invocation) ←---- 尽量在每次基准测试迭代结束后都进行一次垃圾回收public void tearDown() {System.gc();}}
编译这个类时,你之前配置的Maven插件会生成一个名为benchmarks.jar的JAR文件,你可以像下面这样执行它:
java -jar ./target/benchmarks.jar ParallelStreamBenchmark
我们为基准测试特别配置了大的堆,希望尽量避免垃圾回收带来的影响。出于同样的原因,还试图在每次基准测试迭代完成之后强制进行垃圾回收。不得不说,即便已经做了这些准备,基准测试的结果也不可尽信。太多的因素都可能影响执行的时间,譬如你的机器配备了多少个CPU核!你可以尝试在自己的机器上执行本书代码库中的代码,看看结果是什么情况。
通过前一种方式启动,命令会让JMH执行20次基准测试的方法,帮助HotSpot对代码进行充分地热身,接着再次执行20次以上的迭代,以计算基准测试的最终结果。这20+20次迭代是JMH缺失的行为,不过你可以通过JMH声明,或者更简单的命令行选项-w和-i标志位设置新的值。在配备了Intel i7-4600U 2.1 GHz四核CPU的机器上执行该基准测试,打印输出的结果如下:
Benchmark Mode Cnt Score Error UnitsParallelStreamBenchmark.sequentialSum avgt 40 121.843 ± 3.062 ms/op
你应该可以预见到,采用传统for循环迭代的版本,其执行速度会快很多,因为它在更低层执行,更重要的是这种情况不需要对基础类型值执行任何装箱或者拆箱操作。通过在基准测试类添加第二个方法,可以验证这一直觉,如代码清单7-1所示,该方法也使用@Benchmark进行了注解:
@Benchmarkpublic long iterativeSum() {long result = 0;for (long i = 1L; i <= N; i++) {result += i;}return result;}
在测试机上执行第二个基准测试(你可能还需要注释掉第一个基准测试,避免它再次执行),我们得到了下面这组数据:
Benchmark Mode Cnt Score Error UnitsParallelStreamBenchmark.iterativeSum avgt 40 3.278 ± 0.192 ms/op
结果确认了我们的假设:跟预期一致,迭代版本与前一个采用顺序流版本比较起来,执行速度快了40多倍。现在,采用并发流的版本做同样的事情,把该方法加入基准测试类。我们得到了下面这组数据:
Benchmark Mode Cnt Score Error UnitsParallelStreamBenchmark.parallelSum avgt 40 604.059 ± 55.288 ms/op
这个结果令人相当失望,求和方法的并行版本并没能充分利用四核CPU的处理能力,与顺序版本比起来,它甚至慢了五倍!你如何解释这个意外的结果呢?实际上这儿存在两个相互交缠的问题:
iterate生成的是装箱的对象,必须拆箱成数字才能求和;- 我们很难把
iterate分成多个独立块来并行执行。
第二个问题更有意思一点,因为你必须意识到某些流操作比其他操作更容易并行化。具体来说,iterate很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果,如图7-2所示。

图 7-2 iterate在本质上是顺序的
这意味着,在这个特定情况下,归约进程不是像图7-1那样进行的。整张数字列表在归约过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。
这就说明了并行编程可能很复杂,有时候甚至有点违反直觉。如果用得不对(比如采用了一个不易并行化的操作,如iterate),它甚至可能让程序的整体性能更差,所以在调用那个看似神奇的parallel操作时,了解背后到底发生了什么是很有必要的。
使用更有针对性的方法
那到底要怎么利用多核处理器,用流来高效地并行求和呢?第5章中讨论过一个叫LongStream.rangeClosed的方法。这个方法与iterate相比有两个优点。
LongStream.rangeClosed直接产生原始类型的long数字,没有装箱拆箱的开销。LongStream.rangeClosed会生成数字范围,很容易拆分为独立的小块。例如,范围1~20可分为1~5、6~10、11~15和16~20。
首先,把下面的方法添加到基准测试类中,看看采用顺序流时的性能如何,拆箱的开销到底要不要紧:
@Benchmarkpublic long rangedSum() {return LongStream.rangeClosed(1, N).reduce(0L, Long::sum);}
这一次的输出是:
Benchmark Mode Cnt Score Error UnitsParallelStreamBenchmark.rangedSum avgt 40 5.315 ± 0.285 ms/op
这个数值流比前面那个用iterate工厂方法生成数字的顺序执行版本要快得多,因为数值流避免了非针对性流那些没必要的自动装箱和拆箱操作。由此可见,选择适当的数据结构往往比并行化算法更重要。但要是对这个新版本应用并行流呢?
@Benchmarkpublic long parallelRangedSum() {return LongStream.rangeClosed(1, N).parallel().reduce(0L, Long::sum);}
现在把这个方法添加到我们获得的基准测试类中 :
Benchmark Mode Cnt Score Error UnitsParallelStreamBenchmark.parallelRangedSum avgt 40 2.677 ± 0.214 ms/op
终于,我们得到了一个比顺序执行更快的并行归约,因为这一次归约操作可以像图7-1那样执行了。这也表明,使用正确的数据结构然后使其并行工作能够保证最佳的性能。注意,最新的版本也比最初的迭代版本快大约20%,这表明如果使用恰当,函数式程序风格能帮助我们充分利用现代多核处理器的并行处理能力,并且,与命令式编程比较起来,这种方式更简单,也更直接。
尽管如此,请记住,并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每个子流的归约操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在核中并行执行工作的时间比在核之间传输数据的时间长。总而言之,很多情况下不可能或不方便并行化。然而,在使用并行Stream加速代码之前,你必须确保用得对;如果结果错了,算得快就毫无意义了。让我们来看一个常见的陷阱。
7.1.3 正确使用并行流
错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。下面是另一种实现对前
个自然数求和的方法,但这会改变一个共享累加器:
public long sideEffectSum(long n) {Accumulator accumulator = new Accumulator();LongStream.rangeClosed(1, n).forEach(accumulator::add);return accumulator.total;}public class Accumulator {public long total = 0;public void add(long value) { total += value; }}
这种代码非常普遍,特别是对那些熟悉指令式编程范式的程序员来说。这段代码和你习惯的那种指令式迭代数字列表的方式很像:初始化一个累加器,一个个遍历列表中的元素,把它们和累加器相加。
那这种代码又有什么问题呢?不幸的是,它真的无可救药,因为它在本质上就是顺序的。每次访问total都会出现数据竞争。如果你尝试用同步来修复,那就完全失去并行的意义了。为了说明这一点,试着把Stream变成并行的:
public long sideEffectParallelSum(long n) {Accumulator accumulator = new Accumulator();LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);return accumulator.total;}
用代码清单7-1中的测试框架来执行这个方法,并打印每次执行的结果:
System.out.println("SideEffect parallel sum done in: " +measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + "msecs" );
你可能会得到类似于下面这种输出:
Result: 5959989000692Result: 7425264100768Result: 6827235020033Result: 7192970417739Result: 6714157975331Result: 7497810541907Result: 6435348440385Result: 6999349840672Result: 7435914379978Result: 7715125932481SideEffect parallel sum done in: 49 msecs
这回方法的性能无关紧要了,唯一要紧的是每次执行都会返回不同的结果,都离正确值50000005000000差很远。这是由于多个线程在同时访问累加器,执行total += value,而这一句虽然看似简单,却不是一个原子操作。问题的根源在于,forEach中调用的方法有副作用,它会改变多个线程共享的对象的可变状态。要是你想用并行Stream又不想引发类似的意外,就必须避免这种情况。
现在你知道了,共享可变状态会影响并行流以及并行计算。第18章和第19章详细讨论函数式编程的时候,还会谈到这一点。现在,记住要避免共享可变状态,确保并行Stream得到正确的结果。接下来,我们会看到一些实用建议,你可以由此判断什么时候可以利用并行流来提升性能。
7.1.4 高效使用并行流
一般而言,想给出任何关于什么时候该用并行流的定量建议都是不可能也毫无意义的,因为任何类似于“仅当超过1000个元素的时候才用并行流”的建议对于某台特定机器上的某个特定操作可能是对的,但在略有差异的另一种情况下可能就是大错特错。尽管如此,我们至少可以提出一些定性意见,帮你决定某个特定情况下是否有必要使用并行流。
- 如果有疑问,测量。把顺序流转成并行流轻而易举,却不一定是好事。本节中已经指出,并行流并不总是比顺序流快。此外,并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。
- 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(
IntStream、LongStream和DoubleStream)来避免这种操作,但凡有可能都应该用这些流。 - 有些操作本身在并行流上的性能就比顺序流差。特别是
limit和findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性能好,因为它不一定要按顺序来执行。你总是可以调用unordered方法来把有序流变成无序流。那么,如果你需要流中的
个元素而不是专门要前
个的话,对无序并行流调用limit可能会比单个有序流(比如数据源是一个List)更高效。 - 还要考虑流的操作流水线的总计算成本。设
是要处理的元素的总数,
是一个元素通过流水线的大致处理成本,则
就是这个对成本的一个粗略的定性估计。
值较高就意味着使用并行流时性能好的可能性比较大。 - 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
- 要考虑流背后的数据结构是否易于分解。例如,
ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。最后,你将在7.3节中学到,可以自己实现Spliterator来完全掌控分解过程。 - 流自身的特点以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个
SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数无法预测,从而导致流本身的大小未知。 - 还要考虑终端操作中合并步骤的代价是大是小(例如
Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。
表7-1按照可分解性总结了一些流数据源适不适于并行。
表 7-1 流的数据源和可分解性
| 源 | 可分解性 |
|---|---|
ArrayList
| 极佳 |
LinkedList
| 差 |
IntStream.range
| 极佳 |
Stream.iterate
| 差 |
HashSet
| 好 |
TreeSet
| 好 |
最后,我们还要强调并行流背后使用的基础架构是Java 7中引入的分支/合并框架。并行汇总的示例证明了要想正确使用并行流,了解它的内部原理至关重要,所以下一节会仔细研究分支/合并框架。
