15.2 同步及异步API

第7章中展示了如何使用Java 8中的流充分发挥并行硬件的处理能力。这个过程包含两个阶段。首先,你需要使用内部迭代(通过Stream提供的方法)替换外部迭代(显式的for循环)。接着,你可以使用parallel()方法对流进行处理,流中的元素会被Java运行时并发地处理,程序员不再需要使用复杂的线程创建操作重写每一个循环。采用这种方式的另外一个好处是,运行时系统对循环执行时的可用线程数了解更多,而程序员对此往往一头雾水,很多时候都是在猜。

除了循环计算,并行也能为其他的场景带来好处,其中重要的一项就是异步API,它构成了本章、第16章以及第18章的背景,这也是Java的一个重大改进。

下面以一个运行的实例来说明该问题。假设你需要统计方法f和方法g的执行结果,这两个方法的函数签名如下:

  1. int f(int x);
  2. int g(int x);

特别强调一下,我们提到的这些函数签名都是同步API,因为它们物理上返回时,其执行结果也一同返回了。如果你还是很困惑,没关系,很快就能理解了。你可能使用下面这段代码同时调用这两个API,并打印输出它们执行结果之和:

  1. int y = f(x);
  2. int z = g(x);
  3. System.out.println(y + z);

假设方法f和方法g的执行时间都很长(这些方法可能实现了一个数学最优化任务,譬如梯度递归,不过在第16章和第17章中会使用更加实用的例子,即执行互联网查询)。通常而言,Java编译器不会对这段代码执行任何的优化,因为fg可能存在一些交互,而编译器对此知之甚少。然而,如果你非常明确地知道fg不存在任何的交互,或者你对此毫不关心,那么你可以在各自独立的CPU核上分别执行fg,从而缩短程序的执行时间。这种情况下,程序执行的总时间就变成了调用fg中耗时最长的那一个,而不是二者之和了。你需要做的就是在不同的线程中执行fg。这是个很棒的想法,然而代码的逻辑变得更加复杂了4:

4这里的复杂度部分源于需要将线程处理的结果传回。Lambda或者内部类中只能使用final类型的外部对象变量,不过真正困难的是你需要显式地操纵所有的线程。

  1. class ThreadExample {
  2. public static void main(String[] args) throws InterruptedException {
  3. int x = 1337;
  4. Result result = new Result();
  5. Thread t1 = new Thread(() -> { result.left = f(x); } );
  6. Thread t2 = new Thread(() -> { result.right = g(x); });
  7. t1.start();
  8. t2.start();
  9. t1.join();
  10. t2.join();
  11. System.out.println(result.left + result.right);
  12. }
  13. private static class Result {
  14. private int left;
  15. private int right;
  16. }
  17. }

这段代码还可以使用Future API而不是Runnable进一步简化。假设你之前已经建立了一个名为ExecutorService的线程池(譬如executorService),你可以实现下面这段代码:

  1. public class ExecutorServiceExample {
  2. public static void main(String[] args)
  3. throws ExecutionException, InterruptedException {
  4. int x = 1337;
  5. ExecutorService executorService = Executors.newFixedThreadPool(2);
  6. Future<Integer> y = executorService.submit(() -> f(x));
  7. Future<Integer> z = executorService.submit(() -> g(x));
  8. System.out.println(y.get() + z.get());
  9. executorService.shutdown();
  10. }
  11. }

然而,这段代码依然受到了显式调用submit时使用的模板代码的污染。

你需要更好的方式来表达这种思想,就像流的内部迭代避免了使用线程创建语法来并发外部迭代那样。

解决这个问题的答案是将API由同步API变为异步API。5这种方式下,方法不再在物理返回其调用者的同时返回它的执行结果,被调用函数可以在返回结果就绪之前物理上提前返回调用函数,如图15-6所示。由此,对f的调用以及该方法调用之后的代码(这里指的是对g方法的调用)可以并发地执行。通过两种方法可以实现这种并行,不过它们都会改变fg的签名。

5同步API也被称作阻塞式API,方法的物理返回会延迟到返回结果就绪为止(想象执行一次I/O操作的场景),而异步API天然就适合实现非阻塞式I/O(API仅仅负责发起I/O操作,并不等待执行结果。市面上很多库都支持非阻塞I/O操作,譬如Netty)。

第一种方法是使用Java Future的改进版本。Future最初在Java 5中引入,在Java 8中做了进一步的增强,成为了可以组合的CompletableFuture。15.4节会详细介绍这个概念,第16章会以一个实际的例子讲解该Java API的使用。第二种方法是使用Java 9 java.util.concurrent.Flow接口的反应式编程风格,它基于15.5节介绍的“发布–订阅”协议。第17章会结合实际的例子介绍这部分内容。

那么这些可选方案对fg的函数签名有什么影响呢?

15.2.1 Future风格的API

采用这种方式的话,fg的签名

  1. Future<Integer> f(int x);
  2. Future<Integer> g(int x);

需要变更为:

  1. Future<Integer> y = f(x);
  2. Future<Integer> z = g(x);
  3. System.out.println(y.get() + z.get());

其思想是方法f会返回一个Future对象,该对象包含一个继续执行方法体中原始内容的任务,不过方法执行完f后会立刻返回,不会等待执行结果就绪。类似地,方法g也返回一个Future对象,第三行代码使用了一个get()方法等待这两个Future执行完毕,并计算它们的结果之和。

这个例子中,你可以保持方法g的API调用不变,仅在方法f中引入Future,并且不会降低其并发度。然而,对于大型的程序,建议你不要这样做,原因有两个。

  • 其他使用函数g的地方可能也需要Future风格的版本,因此你最好使用统一的API风格。
  • 为了充分发挥并行硬件的处理能力,以使程序运行得又快又好,将程序切分成更多粒度更细的任务是很有帮助的(当然也要控制在合理的范围之内)。

15.2.2 反应式风格的API

第二种方式的核心思想是通过修改fg的函数签名来使用回调风格的编程,如下所示:

  1. void f(int x, IntConsumer dealWithResult);

刚看到这个解决方案,你可能会非常意外。如果函数f不返回任何值,那么它该如何工作呢?答案是采用这种方法,你需要额外向f函数传递一个回调函数(其实是一个Lambda表达式)作为参数,f函数会在函数体中衍生一个任务,这个任务会在结果可用时使用它执行Lambda表达式,这样一来就不需要使用return返回值了。再次强调一下,f函数衍生出执行函数体的任务后就立刻返回了。示例代码如下:

  1. public class CallbackStyleExample {
  2. public static void main(String[] args) {
  3. int x = 1337;
  4. Result result = new Result();
  5. f(x, (int y) -> {
  6. result.left = y;
  7. System.out.println((result.left + result.right));
  8. } );
  9. g(x, (int z) -> {
  10. result.right = z;
  11. System.out.println((result.left + result.right));
  12. });
  13. }
  14. }

啊,原来如此!然而,这两个程序还是不一致。这段代码在打印输出正确结果(函数fg调用之和)之前,打印输出的是最快拿到的值(偶尔还会打印输出两次计算的和,因为这段代码没有加锁,+的两个操作数既可以在打印输出执行之前更新,也可以在打印输出执行之后更新)。解决这个问题有两种途径。

  • 你可以添加if-then-else判断,确定这两个回调函数都已经执行完毕后再调用println打印输出它们的和。为了达到这一目标,你可能还需要在恰当的位置添加锁。
  • 你还可以使用反应式风格的API。然而这种API主要适用于事件序列的处理,而非单一的结果。针对单一结果,采用Future可能更加合适。

注意,反应式编程允许方法fg多次调用它们的回调函数dealWithResult。而原始版本的fg使用return返回结果,return只能被调用一次。Future与此类似,它也只能完成一次,执行Future的计算结果可以通过get()方法获取。某种程度上,反应式风格的异步API天然更适合于处理一系列的值(稍后会将它与流对比),而Future式的API更适合作为一次性处理的概念框架。

15.5节会优化传达这个核心思想的例子,对一个可以处理诸如=C1+C2等公式的电子数据表格进行建模。

你可能会说无论采用上述哪种方法,代码都变得更加复杂了。这种说法在某种程度上是正确的,无论是哪个方法,你都不应该随意使用其API。然而,使用这些API的好处也显而易见,它们能帮你编写更简洁的代码(使用更高阶的数据结构),不需要显式地操纵线程了。此外,使用这些API时,你也需要特别留意,尤其是(a)需要长时间执行计算任务(譬如计算时间有可能长达几毫秒),或者(b)需要等待网络传输或用户输入的场景,恰当地处理这些场景能极大地提升应用的效率。对于场景(a),使用这些技术能让你的程序运行得更快,同时又避免了在代码中塞满线程处理的逻辑。对于场景(b),采用这些技术还能带来额外的好处,因为底层系统能更好地调度线程,避免发生拥塞。接下来的一节中会详细讨论后一点。

15.2.3 有害的睡眠及其他阻塞式操作

当你的应用与用户或者其他应用交互时,往往需要限制事件发生的频率,一种很自然的方式是使用sleep()方法。然而,睡眠线程依旧会占用系统资源。如果睡眠的线程数目不多,一般没什么问题,但如果有大量的线程处于睡眠状态,这就成了你必须要解决的问题(参见15.2.1节和图15-3)。

我们应该牢记的一点是,线程池中的任务即便是处于睡眠状态,也会阻塞其他任务的执行(它们无法停止已经分配了线程的任务,因为这些任务的调度是由操作系统管理的)。

当然,可能阻塞线程池中可用线程执行的不仅仅只有睡眠。任何阻塞式操作都会产生同样的效果。阻塞式操作可以分为两类:一类是等待另一个任务执行,譬如调用Futureget()方法;另一类是等待与外部交互的返回,譬如从网络、数据库服务器或者键盘这样的人机接口读取数据。

你能做什么呢?一个高屋建瓴的回答是永远不要在任务中安排阻塞操作,至少要在你的代码中添加一些异常处理逻辑(更接近生产代码的检查请参考15.2.4节)。更理想的方法是将你的任务切分成两部分——“之前”与“之后”——仅在程序执行未被阻塞时才由Java来调度“之后”部分的执行。

代码片段A,实现为一个单一任务:

  1. work1();
  2. Thread.sleep(10); ←---- 睡眠10
  3. work2();

代码片段B如下所示:

  1. public class ScheduledExecutorServiceExample {
  2. public static void main(String[] args) {
  3. ScheduledExecutorService scheduledExecutorService
  4. = Executors.newScheduledThreadPool(1);
  5. work1();
  6. scheduledExecutorService.schedule(
  7. ScheduledExecutorServiceExample::work2, 10, TimeUnit.SECONDS); ←---- work1()完成之后10秒,启动一个新的任务执行work2()
  8. scheduledExecutorService.shutdown();
  9. }
  10. public static void work1(){
  11. System.out.println("Hello from Work1!");
  12. }
  13. public static void work2(){
  14. System.out.println("Hello from Work2!");
  15. }
  16. }

假设这两个任务都在线程池中执行。

让我们看看代码A是如何执行的。首先,它会被加入线程池的执行队列,之后开始执行。执行过程中,它被sleep调用阻塞,占用了工作线程10秒钟的时间,期间没有执行任何任务。接着它开始执行work2(),执行结束后释放工作线程。与此相反,代码B会首先执行work1(),然后被终止——不过终止之前它会调度等待队列中的任务先执行work2() 10秒钟。

代码B看起来更好,但是为什么呢?代码A和代码B所做的是同一件事情。差别是代码A在其睡眠期间占用了宝贵的线程时间,而代码B并没有傻傻地睡眠,其调度执行了队列中的另一个任务(同时也消耗了几个字节的内存,然而并没有创建新的线程)。

这种效果是你创建任务时应该牢记于心的。任务在执行时会占用宝贵的系统资源,因此,你的目标是让它们持续地处于运行状态,直至其执行完毕,或者释放出使用的资源。任务在提交完后续任务后应该终止执行,而不是被阻塞。

这一原则也应该尽可能地应用于I/O。任务启动读方法调用,或者读结束终止读方法调用,请求运行时库调度一个后续任务,都应该使用非阻塞操作,尽量不要使用传统的阻塞式读取。

这种设计模式似乎会造成大量难于理解的代码。不过Java的CompletableFuture接口(详情请参考15.4节和第16章)在运行时库中对这种风格的代码进行了抽象,你可以使用结合器(combinator)解决这一问题,而无须使用前文介绍的Future的阻塞式操作get()

最后总结一下,如果线程数量是无限的,并且创建线程的开销可以忽略不计的话,那么代码A和代码B都是不错的解决方案。然而,现实世界并非如此,只要你的任务中有线程可能进入睡眠状态,或者会被阻塞,这种情况下代码B无疑是更好的方案。

15.2.4 实战验证

如果你正在设计一个新系统,希望它能充分利用并行硬件的处理能力,那么把它设计成大量小型、并发的任务,同时以异步调用的方式实现所有可能阻塞的操作很可能是最理想的途径。然而,这种“全异步”(everything asynchronous)的设计模式可能并不符合项目实际情况(还记得著名的谚语“至善者,善之敌”吗)。Java从2002年发布的Java 1.4开始就已经有非阻塞式的I/O原语(java.nio)了,然而它们由于过于复杂,应用并不广泛。现实而言,建议你找出能受益于Java并发API的场景,充分利用这些API,而不必额外花精力将每一个API都变成异步的。

你还可以研究一下更新的库,譬如Netty,它提供了用于创建网络服务器的统一的阻塞/非阻塞API。

15.2.5 如何使用异步API进行异常处理

无论是基于Future的异步API还是反应式异步API,被调方法的概念体(conceptual body)都在另一个线程中执行,调用方很可能已经退出了执行,不在调用异常处理器的作用域内。很明显,这种非常规行为触发的异常需要通过其他的动作来处理。然而,这种动作到底是什么呢?FutureCompletableFuture实现中包含的get()方法可以返回异常的信息,此外,你还可以通过像exceptionally()这样的方法进行异常恢复,更多内容将在第16章深入讨论。

对于反应式异步API,你需要修改接口以引入额外的回调函数,这个回调函数会在触发异常时被调用,其方式就像不使用return返回,而是执行设定的回调函数一样。为了实现这种设计,你需要在反应式API中使用多个回调函数,示例如下:

  1. void f(int x, Consumer<Integer> dealWithResult,
  2. Consumer<Throwable> dealWithException);

接着函数f的函数体可能会执行:

  1. dealWithException(e);

如果有多个回调函数,你可以将它们等价地封装成单一对象中的方法来传递一个对象而不是传递多个回调函数。譬如,Java 9的Flow API就将多个回调函数封装成了一个对象(即Subscriber类,它包含了四个回调函数形式的方法)。下面是其中的三个函数:

  1. void onComplete()
  2. void onError(Throwable throwable)
  3. void onNext(T item)

相互独立的回调函数代表了不同的含义,譬如值可以访问了(onNext)、获取值时发生了异常(onError),或者程序收到信号接下来没有新的数据(或者异常),此时就会调用onComplete函数。前面的示例中,f的API可以定义为:

  1. void f(int x, Subscriber<Integer> s);

f函数体现在借助执行下面的操作,以Throwable t的形式表示了一个异常:

  1. s.onError(t);

可以拿这个包含多个回调函数的API与从文件或键盘设备读取数字作对比。如果你将这种设备想象成一个生产者而不是被动的数据结构,它就会产生一个由“这是一个数字”或者“这是一个畸形元素,而非一个数字”这样的元素构成的序列,最终收到通知“没有更多要访问的字符了(文件末尾)”。

我们通常将这些调用称作消息或者事件。譬如,你可以说文件阅读器生产了数字事件3、7以及42,之后它返回了一个畸形数字事件,接着又生产了数字事件2,然后接到了文件末尾事件。

将这些事件看作API的一部分时,要特别注意API并没有保证这些事件之间的相对顺序(我们经常称之为管道协议)。实际操作时,API的附属文档中通常会使用“接收到onComplete事件后,API就不会对后续的事件进行处理了”这样的语句说明协议相关的信息。