16.5 响应CompletableFuturecompletion事件

截至目前,本章你看到的所有示例代码都是在响应之前添加1秒钟等待延迟模拟方法的远程调用。毫无疑问,现实世界中,你的应用访问各远程服务时很可能遭遇无法预知的延迟,触发的原因多种多样,从服务器负荷到网络延迟,有些甚至是远程服务如何评估你应用的商业价值,即可能相对于其他应用,你的应用每次查询的消耗时间更长。

由于这些原因,你想要的商品在某些商店的查询速度会比另一些商店更快。接下来的代码清单中会通过ramdomDelay方法添加一个介于0.5到2.5秒钟之间的随机延迟模拟这种场景,不再使用固定1秒钟的延迟。

代码清单 16-21 一个模拟生成0.5秒至2.5秒随机延迟的方法

  1. private static final Random random = new Random();
  2. public static void randomDelay() {
  3. int delay = 500 + random.nextInt(2000);
  4. try {
  5. Thread.sleep(delay);
  6. } catch (InterruptedException e) {
  7. throw new RuntimeException(e);
  8. }
  9. }

目前为止,你实现的findPrices方法只有在取得所有商店的返回值时才显示商品的价格。而你希望的效果是,只要有商店返回商品价格就在第一时间显示返回值,不再等待那些还未返回的商店(有些甚至会发生超时)。如何实现这种更进一步的改进要求呢?

16.5.1 对最佳价格查询器应用的优化

你要避免的首要问题是,等待创建一个包含了所有价格的List创建完成。你应该做的是直接处理CompletableFuture流,这样每个CompletableFuture都在为某个商店执行必要的操作。为了实现这一目标,在下面的代码清单中,你会对代码清单16-16中代码实现的第一部分进行重构,实现findPricesStream方法来生成一个由CompletableFuture构成的流。

代码清单 16-22 重构findPrices方法返回一个由Future构成的流

  1. public Stream<CompletableFuture<String>> findPricesStream(String product) {
  2. return shops.stream()
  3. .map(shop -> CompletableFuture.supplyAsync(
  4. () -> shop.getPrice(product), executor))
  5. .map(future -> future.thenApply(Quote::parse))
  6. .map(future -> future.thenCompose(quote ->
  7. CompletableFuture.supplyAsync(
  8. () -> Discount.applyDiscount(quote), executor)));
  9. }

现在,你为findPricesStream方法返回的Stream添加了第4个map操作,在此之前,你已经在该方法内部调用了三次map。这个新添加的操作其实很简单,只是在每个CompletableFuture上注册一个操作,该操作会在CompletableFuture完成执行后使用它的返回值。Java 8的CompletableFutureAPI通过thenAccept方法提供了这一功能,它接受CompletableFuture执行完毕后的返回值做参数。在这里的例子中,该值是由Discount服务返回的字符串值,它包含了提供请求商品的商店名称及折扣价格,你想要做的操作也很简单,只是将结果打印输出:

  1. findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));

注意,和你之前看到的thenComposethenCombine方法一样,thenAccept方法也提供了一个异步版本,名为thenAcceptAsync。异步版本的方法会对处理结果的消费者进行调度,从线程池中选择一个新的线程继续执行,不再由同一个线程完成CompletableFuture的所有任务。因为你想要避免不必要的上下文切换,更重要的是你希望避免在等待线程上浪费时间,尽快响应CompletableFuturecompletion事件,所以这里没有采用异步版本。

由于thenAccept方法已经定义了如何处理CompletableFuture返回的结果,一旦CompletableFuture计算得到结果,它就返回一个CompletableFuture。因此,map操作返回的是一个Stream>。对这个CompletableFuture 对象,你能做的事非常有限,只能等待其运行结束,不过这也是你所期望的。你还希望能给最慢的商店一些机会,让它有机会打印输出返回的价格。为了实现这一目的,你可以把构成Stream的所有CompletableFuture对象放到一个数组中,等待所有的任务执行完成,代码如下所示。

代码清单 16-23 响应CompletableFuturecompletion事件

  1. CompletableFuture[] futures = findPricesStream("myPhone")
  2. .map(f -> f.thenAccept(System.out::println))
  3. .toArray(size -> new CompletableFuture[size]);
  4. CompletableFuture.allOf(futures).join();

allOf工厂方法接受一个由CompletableFuture构成的数组,数组中的所有CompletableFuture对象执行完成之后,它返回一个CompletableFuture对象。这意味着,如果你需要等待最初Stream中的所有CompletableFuture对象执行完毕,那么对allOf方法返回的CompletableFuture执行join操作是个不错的主意。这个方法对“最佳价格查询器”应用也是有用的,因为你的用户可能会困惑是否后面还有一些价格没有返回,使用这个方法,你可以在执行完毕之后打印输出一条消息“All shops returned results or timed out”。

然而在另一些场景中,你可能希望只要CompletableFuture对象数组中有任何一个执行完毕就不再等待,比如,你正在查询两个汇率服务器,任何一个返回了结果都能满足你的需求。在这种情况下,你可以使用一个类似的工厂方法anyOf。该方法接受一个CompletableFuture对象构成的数组,返回由第一个执行完毕的CompletableFuture对象的返回值构成的CompletableFuture

16.5.2 付诸实践

正如本节开篇所讨论的,现在你可以通过代码清单16-21中的randomDelay方法模拟远程方法调用,产生一个介于0.5秒到2.5秒的随机延迟,不再使用恒定1秒的延迟值。代码清单16-23应用了这一改变,执行这段代码你会看到不同商店的价格不再像之前那样总是在一个时刻返回,而是随着商店折扣价格返回的顺序逐一地打印输出。为了让这一改变的效果更加明显,我们对代码进行了微调,在输出中打印每个价格计算所消耗的时间:

  1. long start = System.nanoTime();
  2. CompletableFuture[] futures = findPricesStream("myPhone27S")
  3. .map(f -> f.thenAccept(
  4. s -> System.out.println(s + " (done in " +
  5. ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
  6. .toArray(size -> new CompletableFuture[size]);
  7. CompletableFuture.allOf(futures).join();
  8. System.out.println("All shops have now responded in "
  9. + ((System.nanoTime() - start) / 1_000_000) + " msecs");

运行这段代码所产生的输出如下:

  1. BuyItAll price is 184.74 (done in 2005 msecs)
  2. MyFavoriteShop price is 192.72 (done in 2157 msecs)
  3. LetsSaveBig price is 135.58 (done in 3301 msecs)
  4. ShopEasy price is 167.28 (done in 3869 msecs)
  5. BestPrice price is 110.93 (done in 4188 msecs)
  6. All shops have now responded in 4188 msecs

我们看到,由于随机延迟的效果,第一次价格查询比最慢的查询要快两倍多。