5.8 构建流

希望到现在,我们已经让你相信,流对于表达数据处理查询是非常强大而有用的。到目前为止,你已经能够使用stream方法从集合生成流了。此外,我们还介绍了如何根据数值范围创建数值流。但创建流的方法还有许多!本节将介绍如何从值序列、数组、文件来创建流,甚至由生成函数来创建无限流!

5.8.1 由值创建流

你可以使用静态方法Stream.of,通过显式值创建一个流。它可以接受任意数量的参数。例如,以下代码直接使用Stream.of创建了一个字符串流。然后,你可以将字符串转换为大写,再一个个打印出来:

  1. Stream<String> stream = Stream.of("Modern ", "Java ", "In ", "Action");
  2. stream.map(String::toUpperCase).forEach(System.out::println);

你可以使用empty得到一个空流,如下所示:

  1. Stream<String> emptyStream = Stream.empty();

5.8.2 由可空对象创建流

Java 9提供了一个新方法可以由一个可空对象创建流。使用流的过程中,你可能也碰到过这种情况,即你处理的对象有可能为空,而你又需要把它们转换成流(或者由null构成的空的流)进行处理。譬如,如果对象不存在指定键对应的属性,方法System.getProperty就会返回一个null。为了使用流处理它,你需要显式地检查对象值是否为空,如下所示:

  1. String homeValue = System.getProperty("home");
  2. Stream<String> homeValueStream
  3. = homeValue == null ? Stream.empty() : Stream.of(value);

借助于Stream.ofNullable,这段代码可以改写得更加简洁:

  1. Stream<String> homeValueStream
  2. = Stream.ofNullable(System.getProperty("home"));

这种模式搭配flatMap处理由可空对象构成的流时尤其方便:

  1. Stream<String> values =
  2. Stream.of("config", "home", "user")
  3. .flatMap(key -> Stream.ofNullable(System.getProperty(key)));

5.8.3 由数组创建流

你可以使用静态方法Arrays.stream从数组创建一个流。它接受一个数组作为参数。例如,你可以将一个原始类型int的数组转换成一个IntStream,然后对IntStream求和以生成int,如下所示:

  1. int[] numbers = {2, 3, 5, 7, 11, 13};
  2. int sum = Arrays.stream(numbers).sum(); ←---- 总和是41

5.8.4 由文件生成流

Java中用于处理文件等I/O操作的NIO API(非阻塞 I/O)已更新,以便利用Stream API。java.nio.file.Files中的很多静态方法都会返回一个流。例如,一个很有用的方法是Files.lines,它会返回一个由指定文件中的各行构成的字符串流。使用你迄今所学的内容,你可以用这个方法看看一个文件中有多少各不相同的词:

  1. long uniqueWords = 0;
  2. try(Stream<String> lines =
  3. Files.lines(Paths.get("data.txt"), Charset.defaultCharset())){ ←---- 流会自动关闭,因此不需要执行额外的try-finally操作
  4. uniqueWords = lines.flatMap(line -> Arrays.stream(line.split(" "))) ←---- 生成单词流
  5. .distinct() ←---- 删除重复项
  6. .count(); ←---- 数一数有多少不重复的单词
  7. }
  8. catch(IOException e){ ←---- 如果打开文件时出现异常则加以处理
  9. }

你可以使用Files.lines得到一个流,其中的每个元素都是给定文件中的一行。因为流的源头是一个I/O资源,所以这个调用环绕在一个try/catch块中。事实上,调用Files.lines会打开一个I/O资源,这些I/O资源使用完毕后必须被关闭,否则会发生资源泄漏。在过去,你需要显式地声明一个finally块来完成这些回收工作。Stream接口通过实现AutoCloseable接口,很方便地替大家解决了这一问题。这意味着资源的管理都由try代码块全权负责了。一旦你接收到line构成的流,就可以调用linesplit方法,将行拆分成单词。请特别留意,flatMap是如何生成一个扁平单词流的,而不是生成多个流,每一行一个单词流。最后,我们通过串接distinctcount方法,统计了流中有多少不重复的单词。

5.8.5 由函数生成流:创建无限流

Stream API提供了两个静态方法来从函数生成流:Stream.iterateStream.generate。这两个操作可以创建所谓的无限流:不像从固定集合创建的流那样有固定大小的流。由iterategenerate产生的流会用给定的函数按需创建值,因此可以无穷无尽地计算下去!一般来说,应该使用limit(n)来对这种流加以限制,以避免打印无穷多个值。

  • 迭代

我们先来看一个iterate的简单例子,然后再解释:

  1. Stream.iterate(0, n -> n + 2)
  2. .limit(10)
  3. .forEach(System.out::println);

iterate方法接受一个初始值(在这里是0),还有一个依次应用在每个产生的新值上的Lambda(UnaryOperator类型)。这里,使用Lambda n -> n + 2,返回的是前一个元素加上2。因此,iterate方法生成了一个所有正偶数的流:流的第一个元素是初始值0。然后加上2来生成新的值2,再加上2来得到新的值4,以此类推。这种iterate操作基本上是顺序的,因为结果取决于前一次应用。请注意,此操作将生成一个无限流——这个流没有结尾,因为值是按需计算的,可以永远计算下去。我们说这个流是无界的。正如前面所讨论的,这是流和集合之间的一个关键区别。我们使用limit方法来显式限制流的大小。这里只选择了前10个偶数。然后可以调用forEach终端操作来消费流,并分别打印每个元素。

一般来说,在需要依次生成一系列值的时候应该使用iterate,比如一系列日期:1月31日,2月1日,以此类推。来看一个难一点儿的应用iterate的例子,试试测验5.4。

测验5.4:斐波那契元组序列

斐波那契数列是著名的经典编程练习。下面这个数列就是斐波那契数列的一部分:0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55…数列中开始的两个数字是0和1,后续的每个数字都是前两个数字之和。

斐波那契元组序列与此类似,是数列中数字和其后续数字组成的元组构成的序列:(0, 1), (1, 1), (1, 2), (2, 3), (3, 5), (5, 8), (8, 13), (13, 21) …

你的任务是用iterate方法生成斐波那契元组序列中的前20个元素。

让我们帮你入手吧。第一个问题是,iterate方法要接受一个UnaryOperator作为参数,而你需要一个像(0,1)这样的元组流。你还是可以(这次又是比较草率地)使用一个数组的两个元素来代表元组。例如,new int[]{0,1}就代表了斐波那契序列(0, 1)中的第一个元素。这就是iterate方法的初始值:

  1. Stream.iterate(new int[]{0, 1}, ???)
  2. .limit(20)
  3. .forEach(t -> System.out.println("(" + t[0] + "," + t[1] +")"));

在这个测验中,你需要搞清楚???代表的代码是什么。请记住,iterate会按顺序应用给定的Lambda。

答案

  1. Stream.iterate(new int[]{0, 1},
  2. t -> new int[]{t[1], t[0]+t[1]})
  3. .limit(20)
  4. .forEach(t -> System.out.println("(" + t[0] + "," + t[1] +")"));

它是如何工作的呢?iterate需要一个Lambda来确定后续的元素。对于元组(3, 5),其后续元素是(5, 3+5) = (5, 8)。下一个是(8, 5+8)。看到这个模式了吗?给定一个元组,其后续的元素是(t[1],t[0]+t[1])。这可以用这个Lambda来计算:t->new int[]{t[1], t[0]+t[1]}。运行这段代码,你就得到了序列(0, 1), (1, 1), (1, 2), (2, 3), (3, 5), (5, 8), (8, 13), (13, 21)…请注意,如果你只想打印正常的斐波那契数列,可以使用map提取每个元组中的第一个元素:

  1. Stream.iterate(new int[]{0, 1},
  2. t -> new int[]{t[1],t[0] + t[1]})
  3. .limit(10)
  4. .map(t -> t[0])
  5. .forEach(System.out::println);

这段代码将生成斐波那契数列:0, 1, 1, 2, 3, 5, 8, 13, 21, 34…

Java 9对iterate方法进行了增强,它现在可以支持谓词操作了。譬如,你可以由0开始生成一个数字序列,一旦数字大于100就停下来:

  1. IntStream.iterate(0, n -> n < 100, n -> n + 4)
  2. .forEach(System.out::println);

iterate方法的第二个参数是一个谓词,它决定了迭代调用何时终止。注意,你可能会想,使用filter操作完全能实现同样的效果:

  1. IntStream.iterate(0, n -> n + 4)
  2. .filter(n -> n < 100)
  3. .forEach(System.out::println);

非常不幸,事实并非如此。实际上,这段代码根本停不下来!原因在于,filter根本无法了解数字是否需要持续递增,因此它只能不停地执行过滤操作!你可以使用takeWhile解决这个问题,它能对流执行短路操作:

  1. IntStream.iterate(0, n -> n + 4)
  2. .takeWhile(n -> n < 100)
  3. .forEach(System.out::println);

然而,你不得不承认iterate结合谓词要简洁得多!

  • 生成

iterate方法类似,generate方法也可让你按需生成一个无限流。但generate不是依次对每个新生成的值应用函数的。它接受一个Supplier类型的Lambda提供新的值。先来看一个简单的用法:

  1. Stream.generate(Math::random)
  2. .limit(5)
  3. .forEach(System.out::println);

这段代码将生成一个流,其中有五个0到1之间的随机双精度数。例如,运行一次得到了下面的结果:

  1. 0.9410810294106129
  2. 0.6586270755634592
  3. 0.9592859117266873
  4. 0.13743396659487006
  5. 0.3942776037651241

Math.Random静态方法被用作新值生成器。同样,你可以用limit方法显式限制流的大小,否则流将会无限长。

你可能想知道,generate方法还有什么用途。我们使用的供应源(指向Math.random的方法引用)是无状态的:它不会在任何地方记录任何值,以备以后计算使用。但供应源不一定是无状态的。你可以创建存储状态的供应源,它可以修改状态,并在为流生成下一个值时使用。举个例子,我们将展示如何利用generate创建测验5.4中的斐波那契数列,这样你就可以和用iterate方法的办法比较一下。但很重要的一点是,在并行代码中使用有状态的供应源是不安全的。为了内容完整,本章结尾处介绍了斐波那契的有状态的intsupplier,但通常应尽量避免使用!第7章会进一步讨论这个操作的问题和副作用,以及并行流。

我们在这个例子中会使用IntStream说明避免装箱操作的代码。IntStreamgenerate方法会接受一个IntSupplier,而不是Supplier。例如,可以这样来生成一个全是1的无限流:

  1. IntStream ones = IntStream.generate(() -> 1);

你在第3章中已经看到,Lambda允许你创建函数式接口的实例,只要直接内联提供方法的实现就可以。你也可以像下面这样,通过实现IntSupplier接口中定义的getAsInt方法显式传递一个对象(虽然这看起来是无缘无故地绕圈子,也请你耐心看):

  1. IntStream twos = IntStream.generate(new IntSupplier(){
  2. public int getAsInt(){
  3. return 2;
  4. }
  5. });

generate方法将使用给定的供应源,并反复调用getAsInt方法,而这个方法总是返回2。但这里使用的匿名类和Lambda的区别在于,匿名类可以通过字段定义状态,而状态又可以用getAsInt方法来修改。这是一个副作用的例子。你迄今见过的所有Lambda都是没有副作用的,它们没有改变任何状态。

回到斐波那契数列的任务上,你现在需要做的是建立一个IntSupplier,它要把前一项的值保存在状态中,以便getAsInt用它来计算下一项。此外,在下一次调用它的时候,还要更新IntSupplier的状态。下面的代码就是如何创建一个在调用时返回下一个斐波那契项的IntSupplier

  1. IntSupplier fib = new IntSupplier(){
  2. private int previous = 0;
  3. private int current = 1;
  4. public int getAsInt(){
  5. int oldPrevious = this.previous;
  6. int nextValue = this.previous + this.current;
  7. this.previous = this.current;
  8. this.current = nextValue;
  9. return oldPrevious;
  10. }
  11. };
  12. IntStream.generate(fib).limit(10).forEach(System.out::println);

前面的代码创建了一个IntSupplier的实例。此对象有可变的状态:它在两个实例变量中记录了前一个斐波那契项和当前的斐波那契项。getAsInt在调用时会改变对象的状态,由此在每次调用时产生新的值。相比之下,使用iterate的方法则是纯粹不变的:它没有修改现有状态,但在每次迭代时会创建新的元组。你将在第7章了解到,你应该始终采用不变的方法,以便并行处理流,并保持结果正确。

请注意,因为你处理的是一个无限流,所以必须使用limit操作来显式限制它的大小。否则,终端操作(这里是forEach)将永远计算下去。同样,你不能对无限流做排序或归约,因为所有元素都需要处理,而这永远也完不成!