15.4 为并发而生的CompletableFuture和结合器

Future接口的一个问题是它是一个接口,你需要思考如何设计你的并发代码结构才能采用Future实现你的任务。不过,历史上,除了FutureTask这一实现之外,Future也提供了其他几个动作:创建一个Future指定它执行某个计算任务,执行任务,等待执行终止,等等。新版Java提供了更多结构的支持(譬如RecursiveTask,第7章已经介绍过)。

Java 8为这场盛宴带来的是对组合式Future的支持。使用Future接口的CompletableFuture实现,你可以创建组合式的Future。那么,为什么要称其为CompletableFuture,而不是ComposableFuture呢?普通的Future通常是通过一个Callable创建的,它执行完毕后,可以使用get()获得执行的结果。而CompletableFuture允许用户创建一个未指定运行任何代码的Future对象,之后由complete()方法指定其他的线程和值(这里是变量名)完成任务的执行,这样一来get()方法就能获得返回值了。譬如,为了并发地计算f(x)g(x)的和,你可以编写下面的代码:

  1. public class CFComplete {
  2. public static void main(String[] args)
  3. throws ExecutionException, InterruptedException {
  4. ExecutorService executorService = Executors.newFixedThreadPool(10);
  5. int x = 1337;
  6. CompletableFuture<Integer> a = new CompletableFuture<>();
  7. executorService.submit(() -> a.complete(f(x)));
  8. int b = g(x);
  9. System.out.println(a.get() + b);
  10. executorService.shutdown();
  11. }
  12. }

或者你也可以这么写:

  1. public class CFComplete {
  2. public static void main(String[] args)
  3. throws ExecutionException, InterruptedException {
  4. ExecutorService executorService = Executors.newFixedThreadPool(10);
  5. int x = 1337;
  6. CompletableFuture<Integer> a = new CompletableFuture<>();
  7. executorService.submit(() -> b.complete(g(x)));
  8. int a = f(x);
  9. System.out.println(a + b.get());
  10. executorService.shutdown();
  11. }
  12. }

注意,这两种代码实现都会浪费处理资源(回顾一下15.2.3节中的内容),因为有线程执行get()调用而阻塞——前一段代码中的f(x)可能占用较长的时间,后一段代码中g(x)可能占用较长的时间。使用Java 8的CompletableFuture能帮你解决这个问题。不过,先让我们做一个测验。

测验15.1

进一步阅读之前,请思考一个问题。下面这个例子中,怎样才能编写任务,充分利用线程的处理能力: 两个活跃线程分别执行着f(x)g(x),第一个线程执行完毕时,立刻启动一个新的线程返回计算的结果。

答案是,你可以让第一个任务执行f(x),第二个任务执行g(x),第三个任务(它既可以是一个新的线程,也可以复用现存线程中的一个)计算二者之和。不过,第三个任务在前两个任务结束之前不能开始执行。你该如何使用Java解决这个问题呢?

解决方案是使用Future的组合操作。

首先,请回顾一下我们学习过的组合操作,本书中你已经碰到过两次。组合操作是一种强大的程序构造思想,存在于多种语言之中。然而,它在Java中大展拳脚还是伴随着Java 8 Lambda表达式的引入。组合思想的一个例子是在Stream上操作的组合,如下所示:

  1. myStream.map(...).filter(...).sum()

这一思想的另一个实例是你可以对两个函数使用compose()andThen(),生成一个新的函数(详情请参见15.5节)。

这一技术给了你新的途径,使用CompletableFuturethenCombine方法对你的两个计算结果求和显然更好。不用担心看不懂这里的细节,第16章会更深入地讨论这一话题。thenCombine的方法签名如下(为了避免被泛型和通配符搞晕,这里进行了一些简化):

  1. CompletableFuture<V> thenCombine(CompletableFuture<U> other,
  2. BiFunction<T, U, V> fn)

这个方法接受两个CompletableFuture值(返回结果类型分别是TU),并创建一个新值(返回结果类型为V)。前两个值执行结束时,它取得其执行结果,并将结果传递给fn处理,完成返回结果Future的构造,整个过程都没有阻塞发生。你之前的代码现在可以用下面的形式重写:

  1. public class CFCombine {
  2. public static void main(String[] args) throws ExecutionException,
  3. InterruptedException {
  4. ExecutorService executorService = Executors.newFixedThreadPool(10);
  5. int x = 1337;
  6. CompletableFuture<Integer> a = new CompletableFuture<>();
  7. CompletableFuture<Integer> b = new CompletableFuture<>();
  8. CompletableFuture<Integer> c = a.thenCombine(b, (y, z)-> y + z);
  9. executorService.submit(() -> a.complete(f(x)));
  10. executorService.submit(() -> b.complete(g(x)));
  11. System.out.println(c.get());
  12. executorService.shutdown();
  13. }
  14. }

thenCombine这一行代码非常关键: 它在完全不了解Future对象ab要执行什么计算任务的前提下,在线程池中创建了一个计划执行的新计算任务。这个新的执行任务只在前两个执行任务完成之后才会被启动。第三个执行任务c会对前两个执行任务的结果进行求和,(最重要的是)它直到前两个执行任务执行完毕之后才被授权可以在线程上执行,而不是一开始就启动执行,然后阻塞等待前两个线程执行结束。因此,这种设计实际不存在等待的操作,而之前两个版本的代码都有阻塞等待的问题。前两个版本中,如果Future中的计算任务先完成的是第二个,那么线程池中的两个线程都会处于活动状态,即便你这时只需要一个线程执行计算任务。图15-8以图表的方式展示了这种情况。前两个版本中,计算y+x都在固定的线程中,要么是计算f(x)的线程,要么是计算g(x)的线程——二者之间都可能发生等待。与此相反,采用thenCombine调度求和计算,它只会在f(x)g(x)都完成之后才进行。

15.4 为并发而生的CompletableFuture和结合器 - 图1

图 15-8 f(x)g(x)以及结果求和这三个计算的时间序列图

特别明确一下,对很多程序而言,你并不关心少数的线程由于调用了get()方法会被阻塞,因此Java 8之前的Future依然是一种有价值的编程选择。然而,如果你需要处理大量的Future对象(譬如处理大量的服务请求),那么在这种情况下,避免由于调用get()产生的阻塞、并发性的损失甚至是死锁,使用CompletableFuture以及结合器通常是最佳的选择。