8.1 流的简介

流就是一个数据序列(并不是一种数据结构),可以以顺序方式或者并发方式应用某一操作序列来筛选、转换、排序、约简(reduce)或组织这些元素,以获得某一最终对象。例如,如果有一个含有员工数据的流,可以使用该流。

  • 统计员工的总数(这是一项开销较大的末端操作)。
  • 计算在某个区域居住的所有员工的平均薪酬。
  • 获取未达到目标的员工列表。
  • 执行任何涉及全部或部分员工的操作。

流在很大程度上受函数式编程(Scala编程语言提供了一种非常相似的机制)的影响,可与lambda表达式一起使用。流API类似于C#语言中的LINQ(Language-Integrated Query的缩写)查询,在某种程度上,还可与SQL查询相比较。

接下来将解释流的基本特征,以及流的各个组成部分。

8.1.1 流的基本特征

流的主要特征如下。

  • 流并不存储其元素。流从它的源获取元素,并且推送这些元素通过构成管道的所有操作。
  • 可以以并行方式处理流而无须做任何额外工作。创建流时,可以使用stream()方法创建一个顺序流,或者使用parallelStream()方法创建一个并行流。BaseStream接口定义了sequential()方法以获取顺序流,也定义了parallel()方法以获取并行流。顺序流与并行流可以互相转换,并且不限次数。需要考虑的是,末端的流操作执行完毕时,所有的流操作都将按照最后一次设置进行处理。无法命令流去顺序执行一些操作,并发执行另一些操作。从内部来看,Oracle JDK 9和Open JDK 9中的并行流采用了Fork/Join框架的一种实现来执行并发操作。
  • 流受函数式编程和Scala编程语言的影响很大。你可以使用新的lambda表达式作为定义算法的方式,这样的算法在针对流的操作中执行。
  • 流不可重用。例如,从某个值列表中获得一个流时,该流只能使用一次。如果要在同样的数据之上执行另一操作,那么需要创建另一个流。
  • 流可对数据做延迟处理。除非必要,否则流并不会获取数据。正如稍后将学到的,每个流都有一个初始源,有一些中间操作,还有一个末端操作。只有末端操作需要时数据才会被处理,因此流的处理直到执行末端操作时才会开始。
  • 不能以不同方式访问流的元素。采用某种数据结构时,可以访问其中存储的某个特定元素,例如指明它的位置或者键。流操作通常对元素做统一处理,因此你有的只有元素本身。你无法知道元素在流中的位置及其相邻元素。对于并行流而言,可以以任何顺序处理元素。
  • 流操作并不允许修改流的源。例如,如果使用一个列表作为流的源,那么可以将处理结果存放在新列表中,但是不可以添加、删除或者替换初始列表中的元素。尽管听起来很受限制,但是这一特性也非常有用,因为返回从内部Collection创建的流时不用担心该列表会被调用者修改。

8.1.2 流的组成部分

流有三个不同的部分。

  • 生成供流使用的数据的来源。
  • 0个或者多个中间操作,这些操作产生另一个流作为输出。
  • 生成对象的末端操作,该对象可以是一个简单对象,也可以是一个类似数组、列表或者哈希表的Collection。也可以存在不产生任何显式结果的末端操作。

  • 流的来源

流的来源可产生将由Stream对象处理的数据。可从多个数据源创建一个流。例如在Java 8中,Collection接口包括了生成顺序流的stream()方法,以及生成并行流的parallelStream()方法。这样生成的流所能处理的数据可以来自几乎所有在Java中实现的数据结构,例如列表(ArrayListLinkedList等)、集合(HashSetEnumSet),甚至并发数据结构(LinkedBloFmackingDequePriorityBlockingQueue等)。另一种可以生成流的数据结构是数组。Array类含有四种版本可从数组产生流的stream()方法。如果将一个int数值型数组传递给该方法,它将生成IntStream。这实现的是一种特殊的流,用于处理整型数值(你依然可以使用Stream替代IntStream,但是性能可能会比较差)。

与此类似,还可以从long[]数组或double[]数组创建LongStream或者DoubleStream。当然,如果向stream()方法传递一个对象数组,将获得一个同样类型的通用流。在本例中并没有parallelStream()方法,但是一旦获得了该流,就可以调用由BaseStream接口定义的parallel()方法,将顺序流转换成为并发流。

流API还提供了另一个有用的功能,即可以生成流并且按照流的方式处理目录或者文件中的内容。File类提供了多种使用流处理文件的方法。例如,find()方法返回一个含有Path对象的流,其中含有文件树中满足特定条件的文件。list()方法返回一个Path对象流,其中含有关于某个目录的内容。walk()方法返回一个Path对象流,使用深度优先算法处理目录树中的所有对象。但是最有用的方法是lines()方法,它创建了一个String对象流,其中含有文件的各个行,这样就可以使用一个流处理文件的内容。遗憾的是,以上提到的方法在并行处理时性能都很糟糕,除非有成千上万的元素(文件或者行)。

同样,可以使用Stream接口提供的两个方法来创建流,即generate()方法和iterate()方法。generate()方法接收由某一对象类型参数化的Supplier作为参数,生成该类型对象的一个无限顺序流。Supplier接口中含有get()方法。每当流需要一个新的对象时,就会调用该方法来获取流的下一个值。如前所述,流以一种延迟方式处理数据。因此毫无疑问,流本质上就是无限的。你可以使用其他方法转换该无限流。iterate()方法与之类似,但是对于这种情况,该方法会接收一个种子和一个UnaryOperator。第一个值是将UnaryOperator应用于该种子的结果,第二个值是将UnaryOperator应用于第一个结果所产生的结果,以此类推。由于性能方面的问题,在并发应用程序中应该尽可能避免使用该方法。

还有更多流的源,如下所示。

  • String.chars():它返回一个IntStream,其中含有Stringchar值。
  • Random.ints()Random.doubles()或者Random.longs():这些方法分别返回IntStreamDoubleStreamLongStream,其中分别带有各自类型的伪随机值。你可以指定随机数的数值范围,或者你想要获得的随机值的个数。例如,你可以使用new Random.ints(10,20)来生成10到20之间的伪随机数。
  • SplittableRandom:该类提供了与Random类相同的方法,可生成intdoublelong型的伪随机值,但是该类更适合用于并行处理。可以查看Java API文档了解该类的详细情况。
  • Stream.concat()方法:该类接收两个流作为参数,并且创建出一个新的流,将第二个流的元素接在第一个流的元素的后面。
    还可以用其他一些源生成流,但是我们认为那些来源并不重要。
  • 中间操作

中间操作最重要的特征在于它将另一个流作为结果返回。输入流和输出流的对象可以是不同类型的,但是中间操作总可以生成新流。一个流可以有0个或者多个中间操作。Stream接口提供的最重要的中间操作是如下几项。

  • distinct():该方法返回一个含有唯一值的流,所有重复元素都将被去除。
  • filter():该方法返回一个含有满足特定标准的元素的流。
  • flatMap():该方法用于将一个关于流的流(例如一个关于列表、集合等的流)转换成单个流。
  • limit():该方法返回一个流,其中最多包含指定数目的原始元素,从第一个元素起按照相遇顺序选取。
  • map():该方法用于将流的元素从一种类型转换成另一种类型。
  • peek():该方法返回相同的流,只是需要执行一些代码,通常用于记录日志信息。
  • skip():该方法忽略了流的前若干个元素(具体数值以参数方式传递)。
  • sorted():该方法对流的元素进行排序。
    • 末端操作

末端操作将某个对象作为结果返回,而绝不会返回一个流。一般来说,所有流都会以一个末端操作结束,而该末端操作返回的是整个操作序列的最终结果。最重要的末端操作有如下几项。

  • collect():该方法提供了一种约简源流中元素数目的方法,以某种数据结构组织该流的元素。例如,你可以按照任何标准对流的元素进行分组。
  • count():该方法返回流的元素数目。
  • max():该方法返回流的最大元素。
  • min():该方法返回流的最小元素。
  • reduce():该方法将流的元素转换为一个表示该流的唯一对象。
  • forEach()/forEachOrdered():这两个方法将某项操作应用到流的每个元素上。如果流已经有了定义好的顺序,那么第二个方法就会使用该流元素的顺序。
  • findFirst()/findAny():如果要找的元素存在,这两个方法分别返回1或者流的第一个元素。
  • anyMatch()/allMatch()/noneMatch():它们接收一个谓词作为参数,返回一个布尔值来表明流中是否有任意、全部或者没有元素能够匹配该谓词。
  • toArray():该方法返回一个含有流的元素的数组。

8.1.3 MapReduce与MapCollect

MapReduce是一种编程模型,用于在由大量以集群方式工作的机器构成的分布式环境中处理超大规模数据集。它有两个步骤,通常通过以下两个方法实现。

  • Map:这一步对数据进行筛选和转换。
  • Reduce:这一步对数据应用汇总操作。

为了在分布式环境中执行该操作,必须分割数据,然后将其分发到集群中的各台机器上。该编程模型在函数式编程领域已经使用很长时间了。Google近期基于该原理设计了一种新的框架,而且在Apache基金会中,Hadoop项目作为该模型的开源实现广受欢迎。

Java 9提供的流操作允许编程人员实现与此非常类似的结果。Stream接口定义了可以视为映射函数的中间操作(map()filter()sorted()skip()等),而且提供了reduce()方法作为末端操作,其目的是像MapReduce模型的约简操作那样对流的元素进行约简。

约简操作的主要思想是基于前面的中间结果和流元素创建一个新的中间结果。约简的替代方法(也称为可变约简)是将新的结果项整合到可变容器中(例如将其添加到ArrayList)。这种类型的约简通过collect()操作执行,因而称之为MapCollect模型。

本章将介绍如何使用MapReduce模型,第9章将介绍如何使用MapCollect模型。