16.3 让你的代码免受阻塞之苦

所以,你已经被要求进行“最佳价格查询器”应用的开发了,不过你需要查询的所有商店都如16.2节开始时介绍的那样,只提供了同步API。换句话说,你有一个商家的列表,如下所示:

  1. List<Shop> shops = List.of(new Shop("BestPrice"),
  2. new Shop("LetsSaveBig"),
  3. new Shop("MyFavoriteShop"),
  4. new Shop("BuyItAll"));

你需要使用下面这样的签名实现一个方法,它接受产品名作为参数,返回一个字符串列表,这个字符串列表中包括商店的名称和该商店中指定商品的价格:

  1. public List<String> findPrices(String product);

你的第一个想法可能是使用在第4、5、6章中学习的Stream特性。你可能试图写出类似下面这个清单中的代码(是的,作为第一个方案,如果你想到这些已经相当棒了!)。

代码清单 16-8 采用顺序查询所有商店的方式实现的findPrices方法

  1. public List<String> findPrices(String product) {
  2. return shops.stream()
  3. .map(shop -> String.format("%s price is %.2f",
  4. shop.getName(), shop.getPrice(product)))
  5. .collect(toList());
  6. }

好吧,这段代码看起来非常直白。现在试着用该方法去查询你最近这些天疯狂着迷的唯一产品(是的,你已经猜到了,它就是myPhone27S)。此外,也请记录下方法的执行时间,通过这些数据,我们可以比较优化之后的方法会带来多大的性能提升,具体的代码清单如下。

代码清单 16-9 验证findPrices的正确性和执行性能

  1. long start = System.nanoTime();
  2. System.out.println(findPrices("myPhone27S"));
  3. long duration = (System.nanoTime() - start) / 1_000_000;
  4. System.out.println("Done in " + duration + " msecs");

上面代码的运行结果输出如下:

  1. [BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
  2. is 214.13, BuyItAll price is 184.74]
  3. Done in 4032 msecs

正如你预期的,findPrices方法的执行时间仅比四秒钟多了那么几毫秒,因为对这四个商店的查询是顺序进行的,并且一个查询操作会阻塞另一个,每一个操作都要花费大约1秒左右的时间计算请求商品的价格。你怎样才能改进这个结果呢?

16.3.1 使用并行流对请求进行并行操作

读完第7章,你应该想到的第一个,可能也是最快的改善方法是使用并行流来避免顺序计算,如下所示。

代码清单 16-10 对findPrices方法进行并行操作

  1. public List<String> findPrices(String product) {
  2. return shops.parallelStream() ←---- 使用并行流并行地从不同的商店获取价格
  3. .map(shop -> String.format("%s price is %.2f",
  4. shop.getName(), shop.getPrice(product)))
  5. .collect(toList());
  6. }

运行代码,与代码清单16-9的执行结果相比较,你发现新版findPrices的改进了吧。

  1. [BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
  2. is 214.13, BuyItAll price is 184.74]
  3. Done in 1180 msecs

相当不错啊!看起来这是个简单但有效的主意:现在对4个不同商店的查询实现了并行,所以完成所有操作的总耗时只有1秒多一点儿。你能做得更好吗?尝试使用刚学过的CompletableFuture,将findPrices方法中对不同商店的同步调用替换为异步调用。

16.3.2 使用CompletableFuture发起异步请求

你已经知道可以使用工厂方法supplyAsync创建CompletableFuture对象。让我们把它利用起来:

  1. List<CompletableFuture<String>> priceFutures =
  2. shops.stream()
  3. .map(shop -> CompletableFuture.supplyAsync(
  4. () -> String.format("%s price is %.2f",
  5. shop.getName(), shop.getPrice(product))))
  6. .collect(toList());

使用这种方式,你会得到一个List>,列表中的每个CompletableFuture对象在计算完成后都包含商店的String类型的名称。但是,由于你用CompletableFuture实现的findPrices方法要求返回一个List,因此你需要等待所有的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回。

为了实现这个效果,你可以向最初的List>施加第二个map操作,对List中的所有future对象执行join操作,一个接一个地等待它们运行结束。注意CompletableFuture类中的join方法和Future接口中的get方法有相同的含义,并且也声明在Future接口中,它们唯一的不同是join不会抛出任何检测到的异常。使用它你不再需要使用try/catch语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。将所有这些整合在一起,你就可以重新实现findPrices了,具体代码如下。

代码清单 16-11 使用CompletableFuture实现findPrices方法

  1. public List<String> findPrices(String product) {
  2. List<CompletableFuture<String>> priceFutures =
  3. shops.stream()
  4. .map(shop -> CompletableFuture.supplyAsync( ←---- 使用CompletableFuture以异步方式计算每种商品的价格
  5. () -> shop.getName() + " price is " +
  6. shop.getPrice(product)))
  7. .collect(Collectors.toList());
  8. return priceFutures.stream()
  9. .map(CompletableFuture::join) ←---- 等待所有异步操作结束
  10. .collect(toList());
  11. }

注意到了吗?这里使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个地放置两个map操作——这其实是有缘由的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,那么发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作,通知join方法返回计算结果。图16-2解释了这些重要的细节。

16.3 让你的代码免受阻塞之苦 - 图1

图 16-2 为什么Stream的延迟特性会引起顺序执行,以及如何避免

图16-2的上半部分展示了使用单一流水线处理流的过程,我们看到,执行的流程(以虚线标识)是顺序的。事实上,新的CompletableFuture对象只有在前一个操作完全结束之后,才能创建。与此相反,图的下半部分展示了如何先将CompletableFuture对象聚集到一个列表中(即图中以椭圆表示的部分),让对象们可以在等待其他对象完成操作之前就能启动。

运行代码清单16-11中的代码来了解下第三个版本findPrices方法的性能,你会得到下面这几行输出:

  1. [BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
  2. is 214.13, BuyItAll price is 184.74]
  3. Done in 2005 msecs

这个结果让人相当失望,不是吗?超过两秒意味着利用CompletableFuture实现的版本,比刚开始代码清单16-8中原生顺序执行且会发生阻塞的版本快。但是它的用时也差不多是使用并行流的前一个版本的两倍。尤其是,考虑到从顺序执行的版本转换到并行流的版本只做了非常小的改动,就让人更加沮丧。

与此形成鲜明对比的是,我们为采用CompletableFuture完成的新版方法做了大量的工作!但这就是全部的真相吗?这种场景下使用CompletableFuture真的是浪费时间吗?或者我们可能漏掉了某些重要的东西?继续往下探究之前,先休息几分钟,尤其是想想你测试代码的机器是否足以以并行方式运行四个线程。1

1如果你使用的机器足够强大,能以并行方式运行更多的线程(比如说八个线程),那你需要使用更多的商店和并行进程,才能重现这几页中介绍的行为。

16.3.3 寻找更好的方案

并行流的版本工作得非常好,那是因为它能并行地执行四个任务,所以它几乎能为每个商家分配一个线程。但是,如果你想要增加第5个商家到商店列表中,让你的“最佳价格查询”应用对其进行处理,那这时会发生什么情况?毫不意外,顺序执行版本的执行还是需要大约5秒多钟的时间,下面是执行的输出:

  1. [BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
  2. is 214.13, BuyItAll price is 184.74, ShopEasy price is 166.08]
  3. Done in 5025 msecs ←---- 使用顺序流方式的程序输出

非常不幸,并行流版本的程序这次比之前也多消耗了差不多1秒钟的时间,因为可以并行运行(通用线程池中处于可用状态)的四个线程现在都处于繁忙状态,都在对前四个商店进行查询。第5个查询只能等到前面某一个操作完成以释放出空闲线程才能继续,它的运行结果如下:

  1. [BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
  2. is 214.13, BuyItAll price is 184.74, ShopEasy price is 166.08]
  3. Done in 2177 msecs ←---- 使用并行流方式的程序输出

CompletableFuture版本的程序结果如何呢?我们也试着添加第5个商店对其进行了测试,结果如下:

  1. [BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
  2. is 214.13, BuyItAll price is 184.74, ShopEasy price is 166.08]
  3. Done in 2006 msecs ←---- 使用CompletableFuture的程序输出

CompletableFuture版本的程序似乎比并行流版本的程序还快那么一点儿。但是最后这个版本也不太令人满意。如果你试图让你的代码处理九个商店,那么并行流版本耗时3143毫秒,而CompletableFuture版本耗时3009毫秒。它们看起来不相伯仲,究其原因都一样:它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。让我们看看你怎样利用这种配置上的灵活性带来实际应用程序性能上的提升。

16.3.4 使用定制的执行器

就这个主题而言,明智的选择似乎是创建一个配有线程池的执行器,线程池中线程的数目取决于你预计你的应用需要处理的负荷,但是该如何选择合适的线程数目呢?

调整线程池的大小

《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:

N_{{\rm threads}}=N_{{\rm CPU}}*U_{{\rm CPU}}*(1+W/C)

其中:

  • N_{{\rm CPU}}是处理器的核的数目,可以通过Runtime.getRuntime().available Processors()得到;
  • U_{{\rm CPU}}是期望的CPU利用率(该值应该介于0和1之间);
  • W/C是等待时间与计算时间的比率。

你的应用99%的时间都在等待商店的响应,所以估算出的W/C比率为100。这意味着如果你期望的CPU利用率是100%,那么你需要创建一个拥有400个线程的线程池。实际操作中,如果你创建的线程数比商店的数目更多,反而是一种浪费,因为这样做之后,你线程池中的有些线程根本没有机会被使用。出于这种考虑,建议你将执行器使用的线程数,与你需要查询的商店数目设定为同一个值,这样每个商店都应该对应一个服务线程。不过,为了避免发生由于商店的数目过多导致服务器超负荷而崩溃,你还是需要设置一个上限,比如100个线程。代码清单如下。

代码清单 16-12 为“最优价格查询器”应用定制的执行器

  1. private final Executor executor =
  2. Executors.newFixedThreadPool(Math.min(shops.size(), 100), ←---- 创建一个线程池,线程池中线程的数目为100和商店数目二者中较小的一个值
  3. (Runnable r) -> {
  4. Thread t = new Thread(r);
  5. t.setDaemon(true); ←---- 使用守护线程——这种方式不会阻止程序的关停
  6. return t;
  7. }
  8. );

注意,你现在正创建的是一个由守护线程构成的线程池。当一个普通线程在执行时,Java程序无法终止或者退出,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。与此相反,如果将线程标记为守护进程,则意味着程序退出时它也会被回收。这二者之间没有性能上的差异。现在,你可以将执行器作为第二个参数传递给supplyAsync工厂方法了。比如,你现在可以按照下面的方式创建一个可查询指定商品价格的CompletableFuture对象:

  1. CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +
  2. shop.getPrice(product), executor);

改进之后,使用CompletableFuture方案的程序处理五个商店仅耗时1021毫秒,处理九个商店时耗时1022毫秒。一般而言,这种状态会一直持续,直到商店的数目达到我们之前计算的阈值400。这个例子证明了要创建更适合你的应用特性的执行器,利用CompletableFuture向其提交任务执行是个不错的主意。处理需大量使用异步操作的情况时,这几乎是最有效的策略。

并行——使用流还是CompletableFuture

目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。

我们对使用这些API的建议如下。

  • 如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
  • 反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者 W/C 的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

现在你已经了解了如何利用CompletableFuture为你的用户提供异步API,以及如何将一个同步又缓慢的服务转换为异步的服务。不过到目前为止,我们每个Futur中进行的都是单次的操作。下一节中,你会看到如何将多个异步操作结合在一起,以流水线的方式运行,从描述形式上,它与你在前面学习的Stream API有几分类似。