3.12 流的拼接

问题

用户希望将两个或多个流合并为一个流。

方案

Stream.concat 方法用于合并两个流。如果需要合并多个流,请使用 Stream.flatMap 方法。

讨论

假设我们从多个信源获取到数据,且希望使用流来处理其中的每个元素。一种方案是采用 Stream 接口定义的 concat 方法,其签名如下:

  1. static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)

concat 方法将创建一个惰性的拼接流(lazily concatenated stream),其元素是第一个流的所有元素,后跟第二个流的所有元素。根据 Javadoc 的描述,如果两个输入流均为有序流,则生成的流也是有序流;如果某个输入流为并行流,则生成的流也是并行流。关闭生成的流也会关闭两个输入流。

3.12 流的拼接 - 图1 两个输入流所包含的元素类型必须相同。

例 3-59 显示了拼接两个流的简单示例。

例 3-59 拼接两个流

  1. @Test
  2. public void concat() throws Exception {
  3. Stream<String> first = Stream.of("a", "b", "c").parallel();
  4. Stream<String> second = Stream.of("X", "Y", "Z");
  5. List<String> strings = Stream.concat(first, second)
  6. .collect(Collectors.toList());
  7. List<String> stringList = Arrays.asList("a", "b", "c", "X", "Y", "Z");
  8. assertEquals(stringList, strings);
  9. }

➊ 将第二个流的元素附加到第一个流的元素之后

如果需要添加第三个流,可以嵌套使用拼接操作,如例 3-60 所示。

例 3-60 拼接多个流(concat 方法)

  1. @Test
  2. public void concatThree() throws Exception {
  3. Stream<String> first = Stream.of("a", "b", "c").parallel();
  4. Stream<String> second = Stream.of("X", "Y", "Z");
  5. Stream<String> third = Stream.of("alpha", "beta", "gamma");

  6. List&lt;String&gt; strings = Stream.concat(Stream.concat(first, second), third)
  7.         .collect(Collectors.toList());
  8. List&lt;String&gt; stringList = Arrays.asList(&#34;a&#34;, &#34;b&#34;, &#34;c&#34;,
  9.     &#34;X&#34;, &#34;Y&#34;, &#34;Z&#34;, &#34;alpha&#34;, &#34;beta&#34;, &#34;gamma&#34;);
  10. assertEquals(stringList, strings);
  11. }

尽管嵌套拼接的方案可行,但请注意 Javadoc 所做的注释:

通过重复拼接操作构建流时应谨慎行事。访问一个深度拼接流中的元素可能导致深层调用链(deep call chain)甚至抛出 StackOverflowException

换言之,concat 方法实际上构建了一个流的二叉树(binary tree),使用过多就会变得难以处理。

另一种方案是采用 reduce 方法执行多次拼接操作,如例 3-61 所示。

例 3-61 拼接多个流(reduce 方法)

  1. @Test
  2. public void reduce() throws Exception {
  3. Stream<String> first = Stream.of("a", "b", "c").parallel();
  4. Stream<String> second = Stream.of("X", "Y", "Z");
  5. Stream<String> third = Stream.of("alpha", "beta", "gamma");
  6. Stream<String> fourth = Stream.empty();

  7. List&lt;String&gt; strings = Stream.of(first, second, third, fourth)
  8.         .reduce(Stream.empty(), Stream::concat)   
  9.         .collect(Collectors.toList());
  10. List&lt;String&gt; stringList = Arrays.asList(&#34;a&#34;, &#34;b&#34;, &#34;c&#34;,
  11.     &#34;X&#34;, &#34;Y&#34;, &#34;Z&#34;, &#34;alpha&#34;, &#34;beta&#34;, &#34;gamma&#34;);
  12. assertEquals(stringList, strings);
  13. }

➊ 对空流使用 reduce 方法和二元运算符

由于用作方法引用的 concat 方法属于二元运算符,上述程序同样有效。不过请注意,虽然代码更简洁,但并不能解决潜在的栈溢出(stack overflow)问题。

有鉴于此,在合并多个流时,使用 flatMap 方法成为一种自然而然的解决方案,如例 3-62 所示。

例 3-62 拼接多个流(flatMap 方法)

  1. @Test
  2. public void flatMap() throws Exception {
  3. Stream<String> first = Stream.of("a", "b", "c").parallel();
  4. Stream<String> second = Stream.of("X", "Y", "Z");
  5. Stream<String> third = Stream.of("alpha", "beta", "gamma");
  6. Stream<String> fourth = Stream.empty();

  7. List&lt;String&gt; strings = Stream.of(first, second, third, fourth)
  8.         .flatMap(Function.identity())
  9.         .collect(Collectors.toList());
  10. List&lt;String&gt; stringList = Arrays.asList(&#34;a&#34;, &#34;b&#34;, &#34;c&#34;,
  11.     &#34;X&#34;, &#34;Y&#34;, &#34;Z&#34;, &#34;alpha&#34;, &#34;beta&#34;, &#34;gamma&#34;);
  12. assertEquals(stringList, strings);
  13. }

上述代码可以运行,但仍然有其不足。如果任何一个输入流为并行流,那么通过 concat 方法创建的流也是并行流,但 flatMap 方法返回的则不是并行流(例 3-63)。

例 3-63 并行流还是非并行流

  1. @Test
  2. public void concatParallel() throws Exception {
  3. Stream<String> first = Stream.of("a", "b", "c").parallel();
  4. Stream<String> second = Stream.of("X", "Y", "Z");
  5. Stream<String> third = Stream.of("alpha", "beta", "gamma");

  6. Stream&lt;String&gt; total = Stream.concat(Stream.concat(first, second), third);
  7. assertTrue(total.isParallel());
  8. }

  9. @Test

  10. public void flatMapNotParallel() throws Exception {

  11. Stream<String> first = Stream.of("a", "b", "c").parallel();

  12. Stream<String> second = Stream.of("X", "Y", "Z");

  13. Stream<String> third = Stream.of("alpha", "beta", "gamma");

  14. Stream<String> fourth = Stream.empty();

  15. Stream&lt;String&gt; total = Stream.of(first, second, third, fourth)
  16.         .flatMap(Function.identity());
  17. assertFalse(total.isParallel());
  18. }

尽管如此,只要尚未开始处理数据,总是可以通过调用 parallel 方法来实现并行流(例 3-64)。

例 3-64 将 flatMap 方法返回的流转换为并行流

  1. @Test
  2. public void flatMapParallel() throws Exception {
  3. Stream<String> first = Stream.of("a", "b", "c").parallel();
  4. Stream<String> second = Stream.of("X", "Y", "Z");
  5. Stream<String> third = Stream.of("alpha", "beta", "gamma");
  6. Stream<String> fourth = Stream.empty();

  7. Stream&lt;String&gt; total = Stream.of(first, second, third, fourth)
  8.         .flatMap(Function.identity());
  9. assertFalse(total.isParallel());
  10. total = total.parallel();
  11. assertTrue(total.isParallel());
  12. }

如上所示,由于 flatMap 属于中间操作,可以通过 parallel 方法对流进行修改。

简而言之,concat 方法适用于两个流的拼接,可以作为一种一般性归约操作使用,flatMap 方法则更具普遍意义。

另见

感兴趣的话可以阅读一篇不错的博文“Efficient multiple-stream concatenation in Java”,文章描述了将多个流合并为一个流时需要考虑的性能问题。

有关 Stream.flatMap 方法的讨论请参见范例 3.11。