1.4 流
几乎每个Java应用都会制造和处理集合。但集合用起来并不总是那么理想。比方说,你需要从一个列表中筛选金额较高的交易,然后按货币分组。你需要写一大堆模板代码来实现这个数据处理命令,如下所示:
Map<Currency, List<Transaction>> transactionsByCurrencies =new HashMap<>(); ←---- 建立累积交易分组的Mapfor (Transaction transaction : transactions) { ←---- 遍历交易的Listif(transaction.getPrice() > 1000){ ←---- 筛选金额较高的交易Currency currency = transaction.getCurrency(); ←---- 提取交易货币List<Transaction> transactionsForCurrency =transactionsByCurrencies.get(currency);if (transactionsForCurrency == null) { ←---- 如果这个货币的分组Map是空的,那就建立一个transactionsForCurrency = new ArrayList<>();transactionsByCurrencies.put(currency,transactionsForCurrency);}transactionsForCurrency.add(transaction); ←---- 将当前遍历的交易添加到具有同一货币的交易List中}}
此外,很难一眼看出这些代码是做什么的,因为有好几个嵌套的控制流指令。
有了Stream API,你现在可以这样解决这个问题了:
import static java.util.stream.Collectors.groupingBy;Map<Currency, List<Transaction>> transactionsByCurrencies =transactions.stream().filter((Transaction t) -> t.getPrice() > 1000) ←---- 筛选金额较高的交易.collect(groupingBy(Transaction::getCurrency)); ←---- 按货币分组
这看起来有点儿神奇,不过现在先不用担心。第4~7章会专门讲述怎么理解Stream API。现在值得注意的是,Stream API处理数据的方式与Collection API不同。用集合的话,你得自己管理迭代过程。你得用for-each循环一个个地迭代元素,然后再处理元素。我们把这种数据迭代方法称为外部迭代。相反,有了Stream API,你根本用不着操心循环的事情。数据处理完全是在库内部进行的。我们把这种思想叫作内部迭代。第4章还会谈到这些思想。
使用集合的另一个头疼之处是,想想看,要是交易量非常庞大,你要怎么处理这个巨大的列表呢?单个CPU根本搞不定这么大量的数据,但你很可能已经有了一台多核计算机。理想情况下,你可能想让这些CPU核共同分担处理工作,以缩短处理时间。理论上来说,要是你有八个核,那并行起来,处理数据的速度应该是单核的八倍。
多核计算机
所有新的台式机和笔记本电脑都是多核的。它们不是仅有一个CPU,而是有四个、八个,甚至更多CPU,通常称为核6。问题是,经典的Java程序只能利用其中一个核,其他核的处理能力都浪费了。类似地,很多公司利用计算集群(用高速网络连接起来的多台计算机)来高效处理海量数据。Java 8提供了新的编程风格,可更好地利用这样的计算机。
Google的搜索引擎就是一个无法在单台计算机上运行的代码示例。它要读取互联网上的每个页面并建立索引,将每个网页上出现的每个词都映射到包含该词的网址上。然后,如果你用多个词进行搜索,软件就可以快速利用索引,给你一个包含这些词的网页集合。想想看,你会如何在Java中实现这个算法,哪怕是比Google小的引擎也需要你利用计算机上所有的核。
6从某种意义上说,这个名字不太好。一块多核芯片上的每个核都是一个五脏俱全的CPU。但“多核CPU”的说法很流行,所以我们就用核来指代各个CPU。
多线程并非易事
问题在于,通过多线程代码来利用并行(使用先前Java版本中的Thread API)并非易事。你得换一种思路:线程可能会同时访问并更新共享变量。因此,如果没有协调好7 ,那么数据可能会被意外改变。相比一步步执行的顺序模型,这个模型不太好理解8。比如,图1-5就展示了如果没有同步好,两个线程同时向共享变量sum加上一个数时,可能会出现的问题。
7传统上是利用synchronized关键字,但是要是用错了地方,就可能出现很多难以察觉的错误。Java 8基于Stream的并行提倡很少使用synchronized的函数式编程风格,它关注数据分块而不是协调访问。
8啊哈,促使语言发展的一个动力源!

图 1-5 两个线程对共享的sum变量做加法的一种可能方式。结果是105,而不是预想的108
Java 8也用Stream API(java.util.stream)解决了这两个问题:集合处理时的模板化和晦涩,以及难以利用多核。这样设计的第一个原因是,有许多反复出现的数据处理模式,类似于前一节所说的filterApples或SQL等数据库查询语言里熟悉的操作,如果库中有这些就会很方便:根据标准筛选数据(比如较重的苹果),提取数据(例如抽取列表中每个苹果的重量字段),或给数据分组(例如,将一个数字列表分为奇数列表和偶数列表)等。第二个原因是,这类操作常常可以并行。例如,如图1-6所示,在两个CPU上筛选列表,可以让一个CPU处理列表的前一半,另一个CPU处理后一半,这称为分支步骤➊。CPU随后对各自的半个列表做筛选➋。最后➌,一个CPU会将两个结果合并(Google搜索这么快就与此紧密相关,当然用的CPU远远不止两个)。

图 1-6 将filter分支到两个CPU上并合并结果
到这里,我们只是说新的Stream API和Java现有的Collection API的行为差不多,它们都能够访问数据项目的序列。不过,现在最好记住,Collection主要是为了存储和访问数据,Stream则主要用于描述对数据的计算。这里的关键点在于,Stream API允许并提倡并行处理一个Stream中的元素。虽然乍看上去可能有点儿怪,但筛选一个Collection(将上一节的filterApples应用在一个List上)的最快方法常常是将其转换为Stream,进行并行处理,然后再转换回List,下面列举的串行和并行的例子都是如此。我们这里还只是说“几乎免费的并行”,让你稍微体验一下,如何利用Stream和Lambda表达式顺序或并行地从一个列表里筛选比较重的苹果。
顺序处理:
import static java.util.stream.Collectors.toList;List<Apple> heavyApples =inventory.stream().filter((Apple a) -> a.getWeight() > 150).collect(toList());
并行处理:
import static java.util.stream.Collectors.toList;List<Apple> heavyApples =inventory.parallelStream().filter((Apple a) -> a.getWeight() > 150).collect(toList());
Java中的并行与无共享可变状态
大家都说在Java中并行很难,而且和
synchronized相关的“玩意儿”都容易出问题。那Java 8里面有什么“灵丹妙药”呢?事实上有两个。首先,库会负责分块,即把大的流分成几个小的流,以便并行处理。其次,流提供的这个几乎免费的并行,只有在传递给filter之类的库方法的方法不会互动(比方说有可变的共享对象)时才能工作。但是其实这个限制对于程序员来说挺自然的,比如Apple::isGreenApple就是这样。确实,虽然函数式编程中的函数的主要意思是“把函数作为一等值”,但它也常常隐含着第二层意思,即“执行时在元素之间无互动”。
第7章会详细探讨Java 8中的并行数据处理及其特点。在加入所有这些新“玩意儿”改进Java的时候,Java 8设计者发现的一个现实问题就是现有的接口也在改进。比如,Collections.sort方法真的应该属于List接口,但从来没有放在后者里。理想情况下,你会希望做list.sort(comparator),而不是Collections.sort(list, comparator)。这看起来无关紧要,但是在Java 8之前,你可能会更新一个接口,然后发现你把所有实现它的类也给更新了——简直是逻辑灾难!这个问题在Java 8里由默认方法解决了。
