3.9 查找流的第一个元素

问题

用户希望查找满足流中特定条件的第一个元素。

方案

应用筛选器之后使用 java.util.stream.Stream 接口定义的 findFirstfindAny 方法。

讨论

Stream 接口定义的 findFirst 方法返回描述流中第一个元素的 Optional,而 findAny 方法返回描述流中某个元素的 Optional。两种方法都不传入参数,意味着映射或筛选操作已经完成。

例如,给定一个整数列表,为查找第一个偶数,可以在应用偶数筛选器后使用 findFirst 方法,如例 3-46 所示。

例 3-46 查找第一个偶数

  1. Optional<Integer> firstEven = Stream.of(3, 1, 4, 1, 5, 9, 2, 6, 5)
  2. .filter(n -> n % 2 == 0)
  3. .findFirst();
  4. System.out.println(firstEven);

➊ 打印 Optional[4]

如果流为空,则返回值是一个空 Optional(如例 3-47 所示)。

例 3-47 流为空时使用 findFirst 方法

  1. Optional<Integer> firstEvenGT10 = Stream.of(3, 1, 4, 1, 5, 9, 2, 6, 5)
  2. .filter(n -> n > 10)
  3. .filter(n -> n % 2 == 0)
  4. .findFirst();
  5. System.out.println(firstEvenGT10);

➊ 打印 Optional.empty

上述代码在应用筛选器后返回第一个元素,这是否意味着要做大量无用功呢?为什么要对所有元素进行取模运算,然后只选择第一个元素呢?由于流元素实际上是逐一进行处理的,这不是一个问题,相关讨论请参见范例 3.13。

如果流不存在出现顺序(encounter order),它可能返回任何元素。不过在本例中,流确实具有出现顺序,因此无论采用顺序流还是并行流进行搜索,“第一个”偶数始终是 4。详见例 3-48。

例 3-48 在并行流中使用 firstEven

  1. firstEven = Stream.of(3, 1, 4, 1, 5, 9, 2, 6, 5)
  2. .parallel()
  3. .filter(n -> n % 2 == 0)
  4. .findFirst();
  5. System.out.println(firstEven);

➊ 始终打印 Optional[4]

初看之下有些奇怪,为什么在同时处理多个数字时仍然会得到同一个值呢?原因在于出现顺序的概念。

Java API 将出现顺序定义为数据源使其元素可用的顺序。ListArray 都有出现顺序,但 Set 没有。

BaseStream 接口(Stream 的父接口)还定义了一种名为 unordered 的方法,它可能返回(也可能不返回)一个无序流作为中间操作。

set 与出现顺序

虽然 HashSet 实例本身没有出现顺序,但如果重复初始化包含相同数据的 HashSet 实例(Java 8),则每次返回的元素顺序都相同,这意味着每次调用 findFirst 方法也会得到相同的结果。根据 Javadoc 的描述,findFirst 用于无序流时可能会获得不同的结果,但当前的实现不会因为流是无序的而改变其行为。

如果希望获得一个具有不同出现顺序的 Set,可以通过添加和删除足够多的元素来强制进行再散列操作(rehashing)。例如:

  1. List<String> wordList = Arrays.asList(
  2. "this", "is", "a", "stream", "of", "strings");
  3. Set<String> words = new HashSet<>(wordList);
  4. Set<String> words2 = new HashSet<>(words);
  5.  
  6. // 接下来,添加和删除足够多的元素来强制进行再散列操作
  7. IntStream.rangeClosed(0, 50).forEachOrdered(i ->
  8. words2.add(String.valueOf(i)));
  9. words2.retainAll(wordList);
  10.  
  11. // 这些集合是相等的,但具有不同的元素排序
  12. System.out.println(words.equals(words2));
  13. System.out.println("Before: " + words);
  14. System.out.println("After : " + words2);

输出结果类似于:

  1. true
  2. Before: [a, strings, stream, of, this, is]
  3. After : [this, is, strings, stream, of, a]

可以看到,两次排序并不相同,因此调用 findFirst 的结果也将有所不同。

在 Java 9 中,新的不可变集合(与映射)是随机的,即使它们每次都以相同的方式初始化,其每次运行的迭代顺序也会发生变化。4

4感谢 Stuart Marks 所做的解释。

findAny 方法要么返回描述流中某个元素的 Optional,要么在流为空时返回一个空 Optional。在本例中,操作的行为具有显式不确定性(explicit nondeterminism),这意味着可以自由选择流中的任何元素,从而实现对并行操作的优化。

为说明这一点,考虑从一个无序的并行整数流中返回任意元素。例 3-49 在随机延迟 100 毫秒后将每个元素映射到其自身,从而引入了一个人为的延迟。

例 3-49 随机延迟后在并行流中使用 findAny 方法

  1. public Integer delay(Integer n) {
  2. try {
  3. Thread.sleep((long) (Math.random() * 100));
  4. } catch (InterruptedException ignored) {
  5. }
  6. return n;
  7. }
  8.  
  9. // 其他代码
  10.  
  11. Optional<Integer> any = Stream.of(3, 1, 4, 1, 5, 9, 2, 6, 5)
  12. .unordered()
  13. .parallel()
  14. .map(this::delay)
  15. .findAny();
  16.  
  17. System.out.println("Any: " + any);

❶ Java 中唯一可以捕获和忽略的异常 5

5严格来说,不应该捕获并忽略任何异常。尽管忽略 InterruptedException 的做法很常见,但这并不是个好主意。

❷ 顺序并不重要

❸ 在并行流中采用通用 fork/join 线程池

❹ 引入随机延迟

❺ 无论出现顺序如何,返回第一个元素

上述代码可以输出任何给定的数字,取决于先执行到哪个线程。

findFirstfindAny 都属于短路终止操作(short-circuiting, terminal operation)。当作用于无限流时,短路操作可能产生一个有限流。如果终止操作在作用于无限输入时也可能在有限时间内终止,它就属于短路操作。

从这一节讨论的示例可以看到,并行(parallelization)在某些情况下非但不会提高性能,反而会降低性能。流是惰性的,它们只处理满足流水线所需的元素。在本例中,由于只要求返回第一个元素,启动 fork/join 线程池显得有些矫枉过正。详见例 3-50。

例 3-50 在顺序流和并行流中使用 findAny 方法

  1. Optional<Integer> any = Stream.of(3, 1, 4, 1, 5, 9, 2, 6, 5)
  2. .unordered()
  3. .map(this::delay)
  4. .findAny();
  5.  
  6. System.out.println("Sequential Any: " + any);
  7.  
  8. any = Stream.of(3, 1, 4, 1, 5, 9, 2, 6, 5)
  9. .unordered()
  10. .parallel()
  11. .map(this::delay)
  12. .findAny();
  13.  
  14. System.out.println("Parallel Any: " + any);

❶ 顺序流(默认)

❷ 并行流

典型的输出如下所示(在一台 8 核计算机上运行,默认使用 8 线程 fork/join 线程池)。6

6这里假定已将延迟方法修改为打印当前线程的名称及其正在处理的值。

对于顺序处理:

  1. main // 顺序处理,因此只有一个线程
  2. Sequential Any: Optional[3]

对于并行处理:

  1. ForkJoinPool.commonPool-worker-1
  2. ForkJoinPool.commonPool-worker-5
  3. ForkJoinPool.commonPool-worker-3
  4. ForkJoinPool.commonPool-worker-6
  5. ForkJoinPool.commonPool-worker-7
  6. main
  7. ForkJoinPool.commonPool-worker-2
  8. ForkJoinPool.commonPool-worker-4
  9. Parallel Any: Optional[1]

顺序流只需访问并返回一个元素,因此属于短路操作。并行流启动 8 个不同的线程,找到一个元素后关闭所有线程。换言之,并行流访问了大量并不需要的值。

请注意,流的出现顺序是一个重要概念。如果流具有出现顺序,findFirst 方法总是会返回同一个值。而 findAny 方法可以返回任意元素,因此更适合在并行操作中使用。

另见

有关惰性流的讨论请参见范例 3.13,有关并行流的讨论请参见第 9 章。