16.2 实现异步API

为了实现最佳价格查询器应用,让我们从每个商店都应该提供的API定义入手。首先,商店应该声明依据指定产品名称返回价格的方法:

  1. public class Shop {
  2. public double getPrice(String product) {
  3. // 待实现
  4. }
  5. }

该方法的内部实现会查询商店的数据库,但也有可能执行一些别的耗时的任务,比如联系其他外部服务(比如,商店的供应商,或者跟制造商相关的推广折扣)。本章剩下的内容中会采用delay方法模拟这些长期运行的方法的执行,它会人为地引入1秒钟的延迟,方法声明如下。

代码清单 16-2 模拟1秒钟延迟的方法

  1. public static void delay() {
  2. try {
  3. Thread.sleep(1000L);
  4. } catch (InterruptedException e) {
  5. throw new RuntimeException(e);
  6. }
  7. }

为了介绍本章的内容,getPrice方法会调用delay方法,并返回一个随机计算的值,代码清单如下所示。返回随机计算的价格这段代码看起来有些取巧。它使用charAt,依据产品的名称,生成一个随机值作为价格。

代码清单 16-3 在getPrice方法中引入一个模拟的延迟

  1. public double getPrice(String product) {
  2. return calculatePrice(product);
  3. }
  4. private double calculatePrice(String product) {
  5. delay();
  6. return random.nextDouble() * product.charAt(0) + product.charAt(1);
  7. }

很明显,这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被阻塞。为等待同步事件完成而等待1秒钟,这是无法接受的,尤其是考虑到最佳价格查询器对网络中的所有商店都要重复这种操作。本章接下来的小节中,你会了解如何以异步方式使用同步API解决这个问题。但是,出于学习如何设计异步API的考虑,我们会继续这一节的内容,假装还在深受这一困难的烦扰:你是一个睿智的商店店主,已经意识到了这种同步API会为你的用户带来多么痛苦的体验,你希望以异步API的方式重写这段代码,让用户更流畅地访问你的网站。

16.2.1 将同步方法转换为异步方法

为了实现这个目标,你首先需要将getPrice转换为getPriceAsync方法,并修改它的返回值:

  1. public Future<Double> getPriceAsync(String product) { ... }

本章开头已经提到,Java 5引入了java.util.concurrent.Future接口表示一个异步计算(即调用线程可以继续运行,不会因为调用方法而阻塞)的结果。这意味着Future是一个暂时还不可知值的处理器,这个值在计算完成后,可以通过调用它的get方法取得。因为这样的设计,getPriceAsync方法才能立刻返回,给调用线程一个机会,能在同一时间去执行其他有价值的计算任务。新的CompletableFuture类提供了大量的方法,让我们有机会以多种可能的方式轻松地实现这个方法,比如下面就是这样一段实现代码。

代码清单 16-4 getPriceAsync方法的实现

  1. public Future<Double> getPriceAsync(String product) {
  2. CompletableFuture<Double> futurePrice = new CompletableFuture<>(); ←---- 创建CompletableFuture对象,它会包含计算的结果
  3. new Thread( () -> {
  4. double price = calculatePrice(product); ←---- 在另一个线程中以异步方式执行计算
  5. futurePrice.complete(price); ←---- 需长时间计算的任务结束并得出结果时,设置Future的返回值
  6. }).start();
  7. return futurePrice; ←---- 无须等待还没结束的计算,直接返回Future对象
  8. }

在这段代码中,你创建了一个代表异步计算的CompletableFuture对象实例,它在计算完成时会包含计算的结果。接着,你调用fork创建了另一个线程去执行实际的价格计算工作,不等该耗时计算任务结束,直接返回一个Future实例。当请求的产品价格最终计算得出时,你可以使用它的complete方法,结束CompletableFuture对象的运行,并设置变量的值。很显然,这个新版Future的名称也解释了它所具有的特性。使用这个API的客户端,可以通过下面的这段代码对其进行调用。

代码清单 16-5 使用异步API

  1. Shop shop = new Shop("BestShop");
  2. long start = System.nanoTime();
  3. Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); ←---- 查询商店,试图取得商品的价格
  4. long invocationTime = ((System.nanoTime() - start) / 1_000_000);
  5. System.out.println("Invocation returned after " + invocationTime
  6. + " msecs");
  7. // 执行更多任务,比如查询其他商店
  8. doSomethingElse();
  9. // 在计算商品价格的同时
  10. try {
  11. double price = futurePrice.get();
  12. System.out.printf("Price is %.2f%n", price); ←---- Future对象中读取价格,如果价格未知,会发生阻塞
  13. } catch (Exception e) {
  14. throw new RuntimeException(e);
  15. }
  16. long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
  17. System.out.println("Price returned after " + retrievalTime + " msecs");

上面这段代码中,客户向商店查询了某种商品的价格。由于商店提供了异步API,该次调用立刻返回了一个Future对象,通过该对象客户可以在将来的某个时刻取得商品的价格。这种方式下,客户在进行商品价格查询的同时,还能执行一些其他的任务,比如查询其他家商店中商品的价格,而不会呆呆地阻塞在那里等待第一家商店返回请求的结果。最后,如果所有有意义的工作都已经完成,客户所有要执行的工作都依赖于商品价格时,就再调用Futureget方法。执行了这个操作后,客户要么获得Future中封装的值(如果异步任务已经完成),要么发生阻塞,直到该异步任务完成,期望的值能够访问。代码清单16-5产生的输出可能是下面这样:

  1. Invocation returned after 43 msecs
  2. Price is 123.26
  3. Price returned after 1045 msecs

你一定已经发现getPriceAsync方法的调用返回远远早于最终价格计算完成的时间。在16.4节中,你还会知道我们有可能避免发生客户端被阻塞的风险。实际上这非常简单,Future执行完毕可以发送一个通知,仅在计算结果可用时执行一个由Lambda表达式或者方法引用定义的回调函数。不过,当下不会对此进行讨论,现在要解决的是另一个问题:如何正确地管理异步任务执行过程中可能出现的错误。

16.2.2 错误处理

如果没有意外,我们目前开发的代码工作得很正常。但是,如果价格计算过程中产生了错误会怎样呢?非常不幸,这种情况下你会得到一个相当糟糕的结果:用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。

客户端可以使用重载版本的get方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,你应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了TimeoutException。不过,也因为如此,你不会有机会发现计算商品价格的线程内到底发生了什么问题才引发了这样的失效。为了让客户端能了解商店无法提供请求商品价格的原因,你需要使用CompletableFuturecompleteExceptionally方法将导致CompletableFuture内发生问题的异常抛出。对代码清单16-4优化后的结果如下所示。

代码清单 16-6 抛出CompletableFuture内的异常

  1. public Future<Double> getPriceAsync(String product) {
  2. CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  3. new Thread( () -> {
  4. try {
  5. double price = calculatePrice(product);
  6. futurePrice.complete(price); ←---- 如果价格计算正常结束,就完成Future操作并设置商品价格
  7. } catch (Exception ex) {
  8. futurePrice.completeExceptionally(ex); ←---- 否则就抛出导致失败的异常,完成这次Future操作
  9. }
  10. }).start();
  11. return futurePrice;
  12. }

客户端现在会收到一个ExecutionException异常,该异常接受了一个包含失败原因的Exception参数,即价格计算方法最初抛出的异常。所以,举例来说,如果该方法抛出了一个运行时异常“product isn't available”,客户端就会得到像下面这样一段ExecutionException

  1. Exception in thread "main" java.lang.RuntimeException:
  2. java.util.concurrent.ExecutionException: java.lang.RuntimeException:
  3. product not available
  4. at java89inaction.chap16.AsyncShopClient.main(AsyncShopClient.java:16)
  5. Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
  6. product not available
  7. at java.base/java.util.concurrent.CompletableFuture.reportGet
  8. (CompletableFuture.java:395)
  9. at java.base/java.util.concurrent.CompletableFuture.get
  10. (CompletableFuture.java:1999)
  11. at java89inaction.chap16.AsyncShopClient.main(AsyncShopClient.java:14)
  12. Caused by: java.lang.RuntimeException: product not available
  13. at java89inaction.chap16.AsyncShop.calculatePrice(AsyncShop.java:38)
  14. at java89inaction.chap16.AsyncShop.lambda$0(AsyncShop.java:33)
  15. at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run
  16. (CompletableFuture.java:1700)
  17. at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec
  18. (CompletableFuture.java:1692)
  19. at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:283)
  20. at java.base/java.util.concurrent.ForkJoinPool.runWorker
  21. (ForkJoinPool.java:1603)
  22. at java.base/java.util.concurrent.ForkJoinWorkerThread.run
  23. (ForkJoinWorkerThread.java:175)
使用工厂方法supplyAsync创建CompletableFuture

目前为止我们已经了解了如何通过编程创建CompletableFuture对象以及如何获取返回值,虽然看起来这些操作已经比较方便,但还有进一步提升的空间,CompletableFuture类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节。比如,采用supplyAsync方法后,你可以用一行语句重写代码清单16-4中的getPriceAsync方法,如下所示。

代码清单 16-7 使用工厂方法supplyAsync创建CompletableFuture对象

  1. public Future<Double> getPriceAsync(String product) {
  2. return CompletableFuture.supplyAsync(() -> calculatePrice(product));
  3. }

supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。一般而言,向CompletableFuture的工厂方法传递可选参数,指定生产者方法的执行线程是可行的,在16.3.4节中,你会使用这一能力,该小节会介绍如何使用适合你应用特性的执行线程改善程序的性能。

此外,代码清单16-7中getPriceAsync方法返回的CompletableFuture对象与代码清单16-6中你手工创建和完成的CompletableFuture对象是完全等价的,这意味着它提供了同样的错误管理机制,而前者你花费了大量的精力才得以构建。

本章的剩余部分中,我们会假设你非常不幸,无法控制Shop类提供API的具体实现,最终提供给你的API都是同步阻塞式的方法。这也是当你试图使用服务提供的HTTP API时最常发生的情况。你会学到如何以异步的方式查询多个商店,避免被单一的请求所阻塞,并由此提升你的“最佳价格查询器”的性能和吞吐量。