7.3 Spliterator

Spliterator是Java 8中加入的另一个新接口,这个名字代表“可分迭代器”(splitable iterator)。和Iterator一样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。虽然在实践中可能用不着自己开发Spliterator,但了解一下它的实现方式会让你对并行流的工作原理有更深入的了解。Java 8已经为集合框架中包含的所有数据结构提供了一个默认的Spliterator实现。集合实现了Spliterator接口,接口提供了一个默认的spliterator()方法(你将会在第13章中学到关于默认方法的更多信息)。这个接口定义了若干方法,如下面的代码清单所示。

代码清单 7-3 Spliterator接口

  1. public interface Spliterator<T> {
  2. boolean tryAdvance(Consumer<? super T> action);
  3. Spliterator<T> trySplit();
  4. long estimateSize();
  5. int characteristics();
  6. }

与往常一样,TSpliterator遍历的元素的类型。tryAdvance方法的行为类似于普通的Iterator,因为它会按顺序一个一个使用Spliterator中的元素,并且如果还有其他元素要遍历就返回true。但trySplit是专为Spliterator接口设计的,因为它可以把一些元素划出去分给第二个Spliterator(由该方法返回),让它们两个并行处理。Spliterator还可通过estimateSize方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点。

重要的是,要了解这个拆分过程在内部是如何执行的,以便在需要时能够掌控它。因此,下一节会详细地分析它。

7.3.1 拆分过程

将Stream拆分成多个部分的算法是一个递归过程,如图7-6所示。第一步是对第一个Spliterator调用trySplit,生成第二个Spliterator。第二步是对这两个Spliterator调用trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit直到它返回null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的Spliterator在调用trySplit时都返回了null

7.3 Spliterator - 图1

图 7-6 递归拆分过程

这个拆分过程也受Spliterator本身的特性影响,而特性是通过characteristics方法声明的。

Spliterator的特性

Spliterator接口声明的最后一个抽象方法是characteristics,它将返回一个int,代表Spliterator本身特性集的编码。使用Spliterator的客户可以用这些特性来更好地控制和优化它的使用。表7-2总结了这些特性(不幸的是,虽然它们在概念上与收集器的特性有重叠,编码却不一样)。这些特性是在Spliterator接口中定义的int常量。

表 7-2 Spliterator的特性

特性 含义
ORDERED 元素有既定的顺序(例如List),因此Spliterator在遍历和划分时也会遵循这一顺序
DISTINCT 对于任意一对遍历过的元素xyx.equals(y)返回false
SORTED 遍历的元素按照一个预定义的顺序排序
SIZED Spliterator由一个已知大小的源建立(例如Set),因此estimatedSize()返回的是准确值
NON-NULL 保证遍历的元素不会为null
IMMUTABLE Spliterator的数据源不能修改。这意味着在遍历时不能添加、删除或修改任何元素
CONCURRENT Spliterator的数据源可以被其他线程同时修改而无须同步
SUBSIZED Spliterator和所有从它拆分出来的Spliterator都是SIZED

现在你已经看到了Spliterator接口是什么以及它定义了哪些方法,你可以试着自己实现一个Spliterator了。

7.3.2 实现你自己的Spliterator

下面来看一个可能需要你自己实现Spliterator的实际例子。我们要开发一个简单的方法来数数一个String中的单词数。这个方法的一个迭代版本可以写成下面的样子。

代码清单 7-4 一个迭代式词数统计方法

  1. public int countWordsIteratively(String s) {
  2. int counter = 0;
  3. boolean lastSpace = true;
  4. for (char c : s.toCharArray()) { ←---- 逐个遍历String中的所有字符
  5. if (Character.isWhitespace(c)) {
  6. lastSpace = true;
  7. } else {
  8. if (lastSpace) counter++; ←---- 上一个字符是空格,而当前遍历的字符不是空格时,将单词计数器加一
  9. lastSpace = false;
  10. }
  11. }
  12. return counter;
  13. }

把这个方法用在但丁的《神曲》的《地狱篇》的第一句话上:

  1. final String SENTENCE =
  2. " Nel mezzo del cammin di nostra vita " +
  3. "mi ritrovai in una selva oscura" +
  4. " ché la dritta via era smarrita ";
  5. System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");

请注意,我们在句子里添加了一些额外的随机空格,以演示这个迭代实现即使在两个词之间存在多个空格时也能正常工作。正如我们所料,这段代码将打印以下内容:

  1. Found 19 words

理想情况下,你会想要用更为函数式的风格来实现它,因为就像前面说过的,这样你就可以用并行Stream来并行化这个过程,而无须显式地处理线程和同步问题。

  • 以函数式风格重写单词计数器

首先你需要把String转换成一个流。不幸的是,原始类型的流仅限于intlongdouble,所以你只能用Stream

  1. Stream<Character> stream = IntStream.range(0, SENTENCE.length())
  2. .mapToObj(SENTENCE::charAt);

你可以对这个流做归约来计算字数。在归约流时,你得保留由两个变量组成的状态:一个int用来计算到目前为止数过的字数,还有一个boolean用来记得上一个遇到的Character是不是空格。因为Java没有元组(tuple,用来表示由异类元素组成的有序列表的结构,不需要包装对象),所以你必须创建一个新类WordCounter来把这个状态封装起来,如下所示。

代码清单 7-5 用来在遍历Character流时计数的类

  1. class WordCounter {
  2. private final int counter;
  3. private final boolean lastSpace;
  4. public WordCounter(int counter, boolean lastSpace) {
  5. this.counter = counter;
  6. this.lastSpace = lastSpace;
  7. }
  8. public WordCounter accumulate(Character c) { ←---- 和迭代算法一样,accumulate方法一个个遍历Character
  9. if (Character.isWhitespace(c)) {
  10. return lastSpace ?
  11. this :
  12. new WordCounter(counter, true);
  13. } else {
  14. return lastSpace ?
  15. new WordCounter(counter + 1, false) : ←---- 上一个字符是空格,而当前遍历的字符不是空格时,将单词计数器加一
  16. this;
  17. }
  18. }
  19. public WordCounter combine(WordCounter wordCounter) { ←---- 合并两个WordCounter,把其计数器加起来
  20. return new WordCounter(counter + wordCounter.counter,
  21. wordCounter.lastSpace); ←---- 仅需要计数器的总和,无须关心lastSpace
  22. }
  23. public int getCounter() {
  24. return counter;
  25. }
  26. }

这段代码中,accumulate方法定义了如何更改WordCounter的状态,或者更确切地说是用哪个状态来建立新的WordCounter,因为这个类是不可变的。理解这一点非常重要。我们特意采用了一个不可变类来收集状态信息,以便在接下来的步骤中能并发地进行处理。每次遍历到Stream中的一个新的Character时,就会调用accumulate方法。具体来说,就像代码清单7-4中的countWordsIteratively方法一样,当上一个字符是空格,新字符不是空格时,计数器就加一。图7-7展示了accumulate方法遍历到新的Character时,WordCounter的状态转换。调用第二个方法combine时,会对作用于Character流的两个不同子部分的两个WordCounter的部分结果进行汇总,也就是把两个WordCounter内部的计数器加起来。

7.3 Spliterator - 图2

图 7-7 遍历到新的Character cWordCounter的状态转换

现在你已经写好了在WordCounter中累计字符,以及在WordCounter中把它们结合起来的逻辑,那写一个方法来归约Character流就很简单了:

  1. private int countWords(Stream<Character> stream) {
  2. WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
  3. WordCounter::accumulate,
  4. WordCounter::combine);
  5. return wordCounter.getCounter();
  6. }

现在你就可以试一试这个方法,给它由包含但丁的《神曲》中《地狱篇》第一句的String创建的流:

  1. Stream<Character> stream = IntStream.range(0, SENTENCE.length())
  2. .mapToObj(SENTENCE::charAt);
  3. System.out.println("Found " + countWords(stream) + " words");

你可以和迭代版本比较一下输出:

  1. Found 19 words

到现在为止都很好,但我们以函数式实现WordCounter的主要原因之一就是能轻松地并行处理,来看看具体是如何实现的。

  • WordCounter并行工作

你可以尝试用并行流来加快字数统计,如下所示:

  1. System.out.println("Found " + countWords(stream.parallel()) + " words");

不幸的是,这次的输出是:

  1. Found 25 words

显然有些不对,可到底是哪里不对呢?问题的根源并不难找。因为原始的String在任意位置拆分,所以有时一个词会被分为两个词,然后数了两次。这就说明,拆分流会影响结果,而把顺序流换成并行流就可能使结果出错。

如何解决这个问题呢?解决方案就是要确保String不是在随机位置拆开的,而只能在词尾拆开。要做到这一点,你必须为Character实现一个Spliterator,它只能在两个词之间拆开String(如下所示),然后由此创建并行流。

代码清单 7-6 WordCounterSpliterator

  1. class WordCounterSpliterator implements Spliterator<Character> {
  2. private final String string;
  3. private int currentChar = 0;
  4. public WordCounterSpliterator(String string) {
  5. this.string = string;
  6. }
  7. @Override
  8. public boolean tryAdvance(Consumer<? super Character> action) {
  9. action.accept(string.charAt(currentChar++)); ←---- 处理当前字符
  10. return currentChar < string.length(); ←---- 如果还有字符要处理,则返回true
  11. }
  12. @Override
  13. public Spliterator<Character> trySplit() {
  14. int currentSize = string.length() - currentChar;
  15. if (currentSize < 10) {
  16. return null; ←---- 返回null表示要解析的String已经足够小,可以顺序处理
  17. }
  18. for (int splitPos = currentSize / 2 + currentChar;
  19. splitPos < string.length(); splitPos++) { ←---- 将试探拆分位置设定为要解析的String的中间
  20. if (Character.isWhitespace(string.charAt(splitPos))) { ←---- 让拆分位置前进直到下一个空格
  21. Spliterator<Character> spliterator = ←---- 创建一个新WordCounterSpliterator来解析String从开始到拆分位置的部分
  22. new WordCounterSpliterator(string.substring(currentChar,
  23. splitPos));
  24. currentChar = splitPos; ←---- 将这个WordCounterSpliterator的起始位置设为拆分位置
  25. return spliterator; ←---- 发现一个空格并创建了新的Spliterator,所以退出循环
  26. }
  27. }
  28. return null;
  29. }
  30. @Override
  31. public long estimateSize() {
  32. return string.length() - currentChar;
  33. }
  34. @Override
  35. public int characteristics() {
  36. return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
  37. }
  38. }

这个Spliterator由要解析的String创建,并遍历了其中的Character,同时保存了当前正在遍历的索引。让我们快速回顾一下实现了Spliterator接口的WordCounterSpliterator中的各个函数。

  • tryAdvance方法把String中当前索引位置的Character传给了Consumer,并让位置加一。作为参数传递的Consumer是一个Java内部类,在遍历流时将要处理的Character传给了一系列要对其执行的函数。这里只有一个归约函数,即WordCounter类的accumulate方法。如果新的指针位置小于String的总长,且还有要遍历的Character,则tryAdvance返回true
  • trySplit方法是Spliterator中最重要的一个方法,因为它定义了拆分要遍历的数据结构的逻辑。就像在代码清单7-1中实现的RecursiveTaskcompute方法一样(分支/合并框架的使用方式),首先要设定不再进一步拆分的下限。这里用了一个非常低的下限——10个Character,仅仅是为了保证程序会对那个比较短的String做几次拆分。在实际应用中,就像分支/合并的例子那样,你肯定要用更高的下限来避免生成太多的任务。如果剩余的Character数量低于下限,你就返回null表示无须进一步拆分。相反,如果你需要执行拆分,就把试探的拆分位置设在要解析的String块的中间。但我们没有直接使用这个拆分位置,因为要避免把词在中间断开,于是就往前找,直到找到一个空格。一旦找到了适当的拆分位置,就可以创建一个新的Spliterator来遍历从当前位置到拆分位置的子串。把当前位置this设为拆分位置,因为之前的部分将由新的Spliterator来处理,最后返回。
  • 还需要遍历的元素的estimatedSize就是这个Spliterator解析的String的总长度和当前遍历的位置的差。
  • 最后,characteristic方法告诉框架这个SpliteratorORDERED(顺序就是String中各个Character的次序)、SIZEDestimatedSize方法的返回值是精确的)、SUBSIZEDtrySplit方法创建的其他Spliterator也有确切大小)、NON-NULLString中不能有为nullCharacter)和IMMUTABLE(在解析String时不能再添加Character,因为String本身是一个不可变类)的。
    • 运用WordCounterSpliterator

现在就可以用这个新的WordCounterSpliterator来处理并行流了,如下所示:

  1. Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
  2. Stream<Character> stream = StreamSupport.stream(spliterator, true);

传给StreamSupport.stream工厂方法的第二个布尔参数意味着你想创建一个并行流。把这个并行流传给countWords方法:

  1. System.out.println("Found " + countWords(stream) + " words");

可以得到意料之中的正确输出:

  1. Found 19 words

你已经看到了Spliterator如何让你控制拆分数据结构的策略。Spliterator还有最后一个值得注意的功能,就是可以在第一次遍历、第一次拆分或第一次查询估计大小时绑定元素的数据源,而不是在创建时就绑定。这种情况下,它称为延迟绑定(late-binding)的Spliterator。我们专门用附录C来展示如何开发一个工具类来利用这个功能在同一个流上执行多个操作。