C.1 复制流

要达到在一个流上并发地执行多个操作的效果,你需要做的第一件事就是创建一个StreamForker,这个StreamForker会对原始的流进行封装,在此基础之上你可以继续定义你希望执行的各种操作。看看下面这段代码。

代码清单 C-1 定义一个StreamForker,在一个流上执行多个操作

  1. public class StreamForker<T> {
  2. private final Stream<T> stream;
  3. private final Map<Object, Function<Stream<T>, ?>> forks =
  4. new HashMap<>();
  5. public StreamForker(Stream<T> stream) {
  6. this.stream = stream;
  7. }
  8. public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
  9. forks.put(key, f); ←---- 使用一个键对流上的函数进行索引
  10. return this; ←---- 返回this从而保证多次流畅地调用fork方法
  11. }
  12. public Results getResults() {
  13. // 功能待实现
  14. }
  15. }

这里的fork方法接受两个参数。

  • Function参数,它对流进行处理,将流转变为代表这些操作结果的任何类型。
  • key参数,通过它你可以取得操作的结果,并将这些键/函数对累积到一个内部的Map中。 fork方法返回StreamForker自身,因此,你可以通过复制多个操作构造一个流水线。图C-1展示了StreamForker背后的主要思想。

C.1 复制流 - 图1

图 C-1 StreamForker详解

这里用户定义了希望在流上执行的三种操作,这三种操作通过三个键索引标识。StreamForker会遍历原始的流,并创建它的三个副本。这时就可以并行地在复制的流上执行这三种操作,这些函数运行的结果由对应的键进行索引,最终会填入到结果的Map

所有由fork方法添加的操作的执行都是通过getResults方法的调用触发的,该方法返回一个Results接口的实现,具体的定义如下:

  1. public static interface Results {
  2. public <R> R get(Object key);
  3. }

这一接口只有一个方法,你可以将fork方法中使用的key对象作为参数传入,方法会返回该键对应的操作结果。

C.1.1 使用ForkingStreamConsumer实现Results接口

你可以用下面的方式实现getResults方法:

  1. public Results getResults() {
  2. ForkingStreamConsumer<T> consumer = build();
  3. try {
  4. stream.sequential().forEach(consumer);
  5. } finally {
  6. consumer.finish();
  7. }
  8. return consumer;
  9. }

ForkingStreamConsumer同时实现了前面定义的Results接口和Consumer接口。随着进一步剖析它的实现细节,你会看到它主要的任务就是处理流中的元素,将它们分发到多个BlockingQueues中处理,BlockingQueues的数量和通过fork方法提交的操作数是一致的。注意,我们很明确地知道流是顺序处理的,不过,如果你在一个并发流上执行forEach方法,它的元素可能就不是顺序地被插入到队列中了。finish方法会在队列的末尾插入特殊元素表明该队列已经没有更多需要处理的元素了。build方法主要用于创建ForkingStreamConsumer,详细内容请参考下面的代码清单。

代码清单 C-2 使用build方法创建ForkingStreamConsumer

  1. private ForkingStreamConsumer<T> build() {
  2. List<BlockingQueue<T>> queues = new ArrayList<>(); ←---- 创建由队列组成的列表,每一个队列对应一个操作
  3. Map<Object, Future<?>> actions = ←---- 建立用于标识操作的键与包含操作结果的Future之间的映射关系
  4. forks.entrySet().stream().reduce(
  5. new HashMap<Object, Future<?>>(),
  6. (map, e) -> {
  7. map.put(e.getKey(),
  8. getOperationResult(queues, e.getValue()));
  9. return map;
  10. },
  11. (m1, m2) -> {
  12. m1.putAll(m2);
  13. return m1;
  14. });
  15. return new ForkingStreamConsumer<>(queues, actions);
  16. }

代码清单C-2中,你首先创建了前面提到的由BlockingQueues组成的列表。紧接着,你创建了一个MapMap的键就是你在流中用于标识不同操作的键,值包含在Future中,Future中包含了这些操作对应的处理结果。BlockingQueues的列表和Future组成的Map会被传递给ForkingStreamConsumer的构造函数。每个Future都是通过getOperationResult方法创建的,代码清单如下。

代码清单 C-3 使用getOperationResult方法创建Future

  1. private Future<?> getOperationResult(List<BlockingQueue<T>> queues,
  2. Function<Stream<T>, ?> f) {
  3. BlockingQueue<T> queue = new LinkedBlockingQueue<>();
  4. queues.add(queue); ←---- 创建一个队列,并将其添加到队列的列表中
  5. Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue); ←---- 创建一个Spliterator,遍历队列中的元素
  6. Stream<T> source = StreamSupport.stream(spliterator, false); ←---- 创建一个流,将Spliterator作为数据源
  7. return CompletableFuture.supplyAsync( () -> f.apply(source) ); ←---- 创建一个Future对象,以异步方式计算在流上执行特定函数的结果
  8. }

getOperationResult方法会创建一个新的BlockingQueue,并将其添加到队列的列表。这个队列会被传递给一个新的BlockingQueueSpliterator对象,后者是一个延迟绑定的Spliterator,它会遍历读取队列中的每个元素。我们很快会看到这是如何做到的。

接下来你创建了一个顺序流对该Spliterator进行遍历,最终你会创建一个Future在流上执行某个你希望的操作并收集其结果。这里的Future使用CompletableFuture类的一个静态工厂方法创建,CompletableFuture实现了Future接口。这是Java 8新引入的一个类,第16章对它进行过详细的介绍。

C.1.2 开发ForkingStreamConsumerBlockingQueueSpliterator

还有两个非常重要的部分你需要实现,分别是前面提到过的ForkingStreamConsumer类和BlockingQueueSpliterator类。你可以用下面的方式实现前者。

代码清单 C-4 实现ForkingStreamConsumer类,为其添加处理多个队列的流元素

  1. static class ForkingStreamConsumer<T> implements Consumer<T>, Results {
  2. static final Object END_OF_STREAM = new Object();
  3. private final List<BlockingQueue<T>> queues;
  4. private final Map<Object, Future<?>> actions;
  5. ForkingStreamConsumer(List<BlockingQueue<T>> queues,
  6. Map<Object, Future<?>> actions) {
  7. this.queues = queues;
  8. this.actions = actions;
  9. }
  10. @Override
  11. public void accept(T t) {
  12. queues.forEach(q -> q.add(t)); ←---- 将流中遍历的元素添加到所有的队列中
  13. }
  14. void finish() {
  15. accept((T) END_OF_STREAM); ←---- 将最后一个元素添加到队列中,表明该流已经结束
  16. }
  17. @Override
  18. public <R> R get(Object key) {
  19. try {
  20. return ((Future<R>) actions.get(key)).get(); ←---- 等待Future完成相关的计算,返回由特定键标识的处理结果
  21. } catch (Exception e) {
  22. throw new RuntimeException(e);
  23. }
  24. }
  25. }

这个类同时实现了ConsumerResults接口,并持有两个引用,一个指向由BlockingQueues组成的列表,另一个是执行了由Future构成的Map结构,它们表示的是即将在流上执行的各种操作。

Consumer接口要求实现accept方法。这里,每当ForkingStreamConsumer接受流中的一个元素,它就会将该元素添加到所有的BlockingQueues中。另外,当原始流中的所有元素都添加到所有队列后,finish方法会将最后一个元素添加到所有队列中。BlockingQueueSpliterators碰到最后这个元素时会知道队列中不再有需要处理的元素了。

Results接口需要实现get方法。一旦处理结束,get方法会获得Map中由键索引的Future,解析处理的结果并返回。

最后,流上要进行的每个操作都会对应一个BlockingQueueSpliterator。每个BlockingQueueSpliterator都持有一个指向BlockingQueues的引用,这个BlockingQueues是由ForkingStreamConsumer生成的,你可以用与下面这段代码清单类似的方法实现一个BlockingQueueSpliterator

代码清单 C-5 一个遍历BlockingQueue并读取其中元素的Spliterator

  1. class BlockingQueueSpliterator<T> implements Spliterator<T> {
  2. private final BlockingQueue<T> q;
  3. BlockingQueueSpliterator(BlockingQueue<T> q) {
  4. this.q = q;
  5. }
  6. @Override
  7. public boolean tryAdvance(Consumer<? super T> action) {
  8. T t;
  9. while (true) {
  10. try {
  11. t = q.take();
  12. break;
  13. } catch (InterruptedException e) { }
  14. }
  15. if (t != ForkingStreamConsumer.END_OF_STREAM) {
  16. action.accept(t);
  17. return true;
  18. }
  19. return false;
  20. }
  21. @Override
  22. public Spliterator<T> trySplit() {
  23. return null;
  24. }
  25. @Override
  26. public long estimateSize() {
  27. return 0;
  28. }
  29. @Override
  30. public int characteristics() {
  31. return 0;
  32. }
  33. }

这段代码实现了一个Spliterator,不过它并未定义如何切分流的策略,仅仅利用了流的延迟绑定能力。由于这个原因,它也没有实现trySplit方法。

由于无法预测能从队列中取得多少个元素,因此estimatedSize方法也无法返回任何有意义的值。更进一步,因为你没有试图进行任何切分,所以这时的估算也没什么用处。

这一实现并没有体现表7-2中列出的Spliterator的任何特性,因此characteristic方法返回0

这段代码中提供了实现的唯一方法是tryAdvance,它从BlockingQueue中取得原始流中的元素,而这些元素最初由ForkingStreamConsumer添加。依据getOperationResult方法创建Spliterator同样的方式,这些元素会被作为进一步处理流的源头传递给Consumer对象(在流上要执行的函数会作为参数传递给某个fork方法调用)。tryAdvance方法返回true以通知调用方还有其他的元素需要处理,直到它发现由ForkingStreamConsumer添加的特殊对象,表明队列中已经没有更多需要处理的元素了。图C-2展示了StreamForker及其构建模块的概述。

C.1 复制流 - 图2

图 C-2 StreamForker及其合作的构造块

这幅图中,左上角的StreamForker中包含一个Map结构,以方法的形式定义了流上要执行的操作,这些方法分别由对应的键索引。右边的ForkingStreamConsumer为每一种操作的对象维护了一个队列,原始流中的所有元素会被分发到这些队列中。

图的下半部分,每一个队列都有一个BlockingQueueSpliterator从队列中提取元素作为各个流处理的源头。最后,由原始流复制创建的每个流,都会被作为参数传递给某个处理函数,执行对应的操作。至此,你已经实现了StreamForker所有组件,可以开始工作了。

C.1.3 将StreamForker运用于实战

我们将StreamForker应用到第4章中定义的menu数据模型上,希望对它进行一些处理。通过复制原始的菜肴(dish)流,我们想以并发的方式执行四种不同的操作,代码清单如下所示。这尤其适用于以下情况:你想要生成一份由逗号分隔的菜肴名列表,计算菜单的总热量,找出热量最高的菜肴,并按照菜的类型对这些菜进行分类。

代码清单 C-6 将StreamForker运用于实战

  1. Stream<Dish> menuStream = menu.stream();
  2. StreamForker.Results results = new StreamForker<Dish>(menuStream)
  3. .fork("shortMenu", s -> s.map(Dish::getName)
  4. .collect(joining(", ")))
  5. .fork("totalCalories", s -> s.mapToInt(Dish::getCalories).sum())
  6. .fork("mostCaloricDish", s -> s.collect(reducing(
  7. (d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2))
  8. .get())
  9. .fork("dishesByType", s -> s.collect(groupingBy(Dish::getType)))
  10. .getResults();
  11. String shortMenu = results.get("shortMenu");
  12. int totalCalories = results.get("totalCalories");
  13. Dish mostCaloricDish = results.get("mostCaloricDish");
  14. Map<Dish.Type, List<Dish>> dishesByType = results.get("dishesByType");
  15. System.out.println("Short menu: " + shortMenu);
  16. System.out.println("Total calories: " + totalCalories);
  17. System.out.println("Most caloric dish: " + mostCaloricDish);
  18. System.out.println("Dishes by type: " + dishesByType);

StreamForker提供了一种使用简便、结构流畅的API,它能够复制流,并对每个复制的流施加不同的操作。这些应用在流上以函数的形式表示,可以用任何对象的方式标识,在这个例子里,我们选择使用String的方式。如果你没有更多的流需要添加,那么可以调用StreamForkergetResults方法,触发所有定义的操作开始执行,并取得StreamForker.Results。由于这些操作的内部实现就是异步的,getResults方法调用后会立刻返回,不会等待所有的操作完成,拿到所有的执行结果才返回。

你可以通过向StreamForker.Results接口传递标识特定操作的键来取得某个操作的结果。如果该时刻操作已经完成,get方法就会返回对应的结果;否则,该方法会阻塞,直到计算结束,取得对应的操作结果。

正如我们所预期的,这段代码会产生下面这些输出:

  1. Short menu: pork, beef, chicken, french fries, rice, season fruit, pizza,
  2. prawns, salmon
  3. Total calories: 4300
  4. Most caloric dish: pork
  5. Dishes by type: {OTHER=[french fries, rice, season fruit, pizza], MEAT=[pork,
  6. beef, chicken], FISH=[prawns, salmon]}