16.4 对多个异步任务进行流水线操作
让我们假设所有的商店都同意使用一个集中式的折扣服务。该折扣服务提供了五个不同的折扣代码,每个折扣代码对应不同的折扣率。你使用一个枚举型变量Discount.Code来实现这一想法,具体代码如下所示。
代码清单 16-13 以枚举类型定义的折扣代码
public class Discount {public enum Code {NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);private final int percentage;Code(int percentage) {this.percentage = percentage;}}// Discount类的具体实现这里暂且不表示,参见代码清单16-14}
我们还假设所有的商店都同意修改getPrice方法的返回格式。getPrice现在以ShopName:price:DiscountCode的格式返回一个String类型的值。我们的示例实现中会返回一个随机生成的Discount.Code,以及已经计算得出的随机价格:
public String getPrice(String product) {double price = calculatePrice(product);Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];return String.format("%s:%.2f:%s", name, price, code);}private double calculatePrice(String product) {delay();return random.nextDouble() * product.charAt(0) + product.charAt(1);}
调用getPrice方法可能会返回像下面这样一个String值:
BestPrice:123.26:GOLD
16.4.1 实现折扣服务
你的“最佳价格查询器”应用现在能从不同的商店取得商品价格,解析结果字符串,针对每个字符串,查询折扣服务器的折扣代码。这个流程决定了请求商品的最终折扣价格(每个折扣代码的实际折扣比率有可能发生变化,所以你每次都需要查询折扣服务)。我们已经将对商店返回字符串的解析操作封装到了下面的Quote类之中:
public class Quote {private final String shopName;private final double price;private final Discount.Code discountCode;public Quote(String shopName, double price, Discount.Code code) {this.shopName = shopName;this.price = price;this.discountCode = code;}public static Quote parse(String s) {String[] split = s.split(":");String shopName = split[0];double price = Double.parseDouble(split[1]);Discount.Code discountCode = Discount.Code.valueOf(split[2]);return new Quote(shopName, price, discountCode);}public String getShopName() { return shopName; }public double getPrice() { return price; }public Discount.Code getDiscountCode() { return discountCode; }}
通过传递shop对象返回的字符串给静态工厂方法parse,你可以得到Quote类的一个实例,它包含了shop的名称、折扣之前的价格,以及折扣代码。
Discount服务还提供了一个applyDiscount方法,它接受一个Quote对象,返回一个字符串,表示生成该Quote的shop中的折扣价格,代码如下所示。
代码清单 16-14
Discount服务
public class Discount {public enum Code {// 源码暂时省略……}public static String applyDiscount(Quote quote) {return quote.getShopName() + " price is " +Discount.apply(quote.getPrice(), ←---- 将折扣代码应用于商品最初的原始价格quote.getDiscountCode());}private static double apply(double price, Code code) {delay(); ←---- 模拟Discount服务响应的延迟return format(price * (100 - code.percentage) / 100);}}
16.4.2 使用Discount服务
由于Discount服务是一种远程服务,因此你还需要增加1秒钟的模拟延迟,代码如下所示。和在16.3节中一样,首先尝试以最直接的方式(坏消息是,这种方式是顺序而且同步执行的)重新实现findPrices,以满足这些新增的需求。
代码清单 16-15 以最简单的方式实现使用
Discount服务的findPrices方法
public List<String> findPrices(String product) {return shops.stream().map(shop -> shop.getPrice(product)) ←---- 取得每个shop对象中商品的原始价格.map(Quote::parse) ←---- 在Quote对象中对shop返回的字符串进行转换.map(Discount::applyDiscount) ←---- 联系Discount服务,为每个Quote申请折扣.collect(toList());}
通过在shop构成的流上采用流水线方式执行三次map操作,我们得到了期望的结果。
- 第一个操作将每个
shop对象转换成了一个字符串,该字符串包含了该shop中指定商品的价格和折扣代码。 - 第二个操作对这些字符串进行了解析,在
Quote对象中对它们进行转换。 - 最终,第三个
map会操作联系远程的Discount服务,计算出最终的折扣价格,并返回包含该价格及提供该价格商品的shop的字符串。
你可能已经猜到,这种实现方式的性能远非最优,不过还是应该测量一下。跟之前一样,通过运行基准测试,我们得到下面的数据:
[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop priceis 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28]Done in 10028 msecs
毫无意外,这次执行耗时10秒,因为顺序查询五个商店耗时大约5秒,现在又加上了Discount服务为五个商店返回的价格申请折扣所消耗的5秒钟。你已经知道,把流转换为并行流的方式,非常容易提升该程序的性能。不过,通过16.3节的介绍,你也知道这一方案在商店的数目增加时,扩展性不好,因为Stream底层依赖的是线程数量固定的通用线程池。相反,你也知道,通过自定义CompletableFuture调度任务执行的执行器能够更充分地利用CPU资源。
16.4.3 构造同步和异步操作
让我们再次使用CompletableFuture提供的特性,以异步方式重新实现findPrices方法。详细代码如下所示。如果你发现有些内容不太熟悉,不用太担心,我们很快会进行针对性的介绍。
代码清单 16-16 使用
CompletableFuture实现findPrices方法
public List<String> findPrices(String product) {List<CompletableFuture<String>> priceFutures =shops.stream().map(shop -> CompletableFuture.supplyAsync( ←---- 以异步方式取得每个shop中指定产品的原始价格() -> shop.getPrice(product), executor)).map(future -> future.thenApply(Quote::parse)) ←---- Quote对象存在时,对其返回的值进行转换.map(future -> future.thenCompose(quote -> ←---- 使用另一个异步任务构造期望的Future,申请折扣CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))).collect(toList());return priceFutures.stream().map(CompletableFuture::join) ←---- 等待流中的所有Future执行完毕,并提取各自的返回值.collect(toList());}
这一次,事情看起来变得更加复杂了,所以一步一步来理解到底发生了什么。这三次转换的流程如图16-3所示。

图 16-3 构造同步操作和异步任务
你所进行的这三次map操作和代码清单16-15中的同步方案没有太大的区别,不过你使用CompletableFuture类提供的特性,在需要的地方把它们变成了异步操作。
- 获取价格
这三个操作中的第一个你已经在本章的各个例子中见过很多次,只需要将Lambda表达式作为参数传递给supplyAsync工厂方法就可以以异步方式对shop进行查询。第一个转换的结果是一个Stream,一旦运行结束,每个CompletableFuture对象中都会包含对应shop返回的字符串。注意,你对CompletableFuture进行了设置,用代码清单16-12中的方法向其传递了一个定制的执行器Executor。
- 解析报价
现在你需要通过第二次转换将字符串转变为订单。因为一般情况下解析操作不涉及任何远程服务,也不会进行任何I/O操作,它几乎可以在第一时间进行,所以能够采用同步操作,不会带来太多的延迟。由于这个原因,你可以对第一步中生成的CompletableFuture对象调用它的thenApply,将一个由字符串转换Quote的方法作为参数传递给它。
注意到了吗?直到你调用的CompletableFuture执行结束,使用的thenApply方法都不会阻塞你代码的执行。这意味着CompletableFuture最终结束运行时,你希望传递Lambda表达式给thenApply方法,将Stream中的每个CompletableFuture对象转换为对应的CompletableFuture对象。你可以把这看成是为处理CompletableFuture的结果建立了一个菜单,就像曾经为Stream的流水线所做的事儿一样。
- 为计算折扣价格构造
Future
第三个map操作涉及联系远程的Discount服务,为从商店中得到的原始价格申请折扣率。这一转换与前一个转换又不大一样,因为这一转换需要远程执行(或者,就这个例子而言,它需要模拟远程调用带来的延迟),出于这一原因,你也希望它能够异步执行。
为了实现这一目标,你像第一个调用传递getPrice给supplyAsync那样,将这一操作以Lambda表达式的方式传递给了supplyAsync工厂方法,该方法最终会返回另一个CompletableFuture对象。到目前为止,你已经进行了两次异步操作,用了两个不同的CompletableFuture对象进行建模,你希望能把它们以级联的方式串接起来进行工作。
- 从
shop对象中获取价格,接着把价格转换为Quote。 - 拿到返回的
Quote对象,将其作为参数传递给Discount服务,取得最终的折扣价格。
Java 8的CompletableFutureAPI提供了名为thenCompose的方法,它就是专门为这一目的而设计的,thenCompose方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。换句话说,你可以创建两个CompletableFuture对象,对第一个CompletableFuture对象调用thenCompose,并向其传递一个函数。当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回做输入计算出的第二个CompletableFuture对象。使用这种方式,即使Future在向不同的商店收集报价,主线程还是能继续执行其他重要的操作,比如响应UI事件。
将这三次map操作返回的Stream元素收集到一个列表,你就得到了一个List ,等这些CompletableFuture对象最终执行完毕,你就可以像代码清单16-11中那样利用join取得它们的返回值。代码清单16-8实现的新版findPrices方法产生的输出如下:
[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop priceis 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28]Done in 2035 msecs
你在代码清单16-16中使用的thenCompose方法像CompletableFuture类中的其他方法一样,也提供了一个以Async后缀结尾的版本thenComposeAsync。通常而言,名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行,而名称以Async结尾的方法会将后续任务提交到一个线程池,所以每个任务是由不同的线程处理的。就这个例子而言,第二个CompletableFuture对象的结果取决于第一个CompletableFuture,所以无论你使用哪个版本的方法来处理CompletableFuture对象,对于最终的结果,或者大致的时间而言都没有多少差别。我们选择thenCompose方法的原因是因为它更高效一点,因为它少了很多线程切换的开销。注意,即便如此,也很难搞清楚到底使用的是哪一个线程,尤其是如果你的应用还使用了自己的线程池(譬如Spring),那就更加困难了。
16.4.4 将两个CompletableFuture对象整合起来,无论它们是否存在依赖
在代码清单16-16中,你对一个CompletableFuture对象调用了thenCompose方法,并向其传递了第二个CompletableFuture,而第二个CompletableFuture又需要使用第一个CompletableFuture的执行结果作为输入。但是,另一种比较常见的情况是,你需要将两个完全不相干的CompletableFuture对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二个任务。
这种情况下,你应该使用thenCombine方法,它接受名为BiFunction的第二个参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法一样,thenCombine方法也提供了一个Async的版本。这里,如果使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,那么由另一个任务以异步的方式执行。
回到我们正在运行的这个例子,你知道,有一家商店提供的价格是以欧元(EUR)计价的,但是你希望以美元的方式提供给你的客户。你可以用异步的方式向商店查询指定商品的价格,同时从远程的汇率服务那里查到欧元和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以美元计价的商品价格。用这种方式,你需要使用第三个CompletableFuture对象,当前两个CompletableFuture计算出结果,并由BiFunction方法完成合并后,由它来最终结束这一任务,代码清单如下。
代码清单 16-17 合并两个独立的
CompletableFuture对象
Future<Double> futurePriceInUSD =CompletableFuture.supplyAsync(() -> shop.getPrice(product)) ←---- 创建第一个任务查询商店取得商品的价格.thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), ←---- 创建第二个独立任务,查询美元和欧元之间的转换汇率(price, rate) -> price * rate ←---- 通过乘法整合得到的商品价格和汇率);
这里整合的操作只是简单的乘法操作,用另一个单独的任务对其进行操作有些浪费资源,所以你只要使用thenCombine方法,无须特别求助于异步版本的thenCombineAsync方法。图16-4展示了代码清单16-17中创建的多个任务是如何在线程池中选择不同的线程执行的,以及它们最终的运行结果又是如何整合的。

图 16-4 合并两个相互独立的异步任务
16.4.5 对Future和CompletableFuture的回顾
前文介绍的最后两个例子,即代码清单16-16和代码清单16-17,非常清晰地呈现了相对于采用Java 8之前提供的Future实现,CompletableFuture版本实现所具备的巨大优势。CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。为了更直观地感受一下使用CompletableFuture在代码可读性上带来的巨大提升,你可以尝试仅使用Java 7中提供的特性,重新实现代码清单16-17的功能。代码清单16-18展示了如何实现这一效果。
代码清单 16-18 利用Java 7的方法合并两个
Future对象
ExecutorService executor = Executors.newCachedThreadPool(); ←---- 创建一个ExecutorService将任务提交到线程池final Future<Double> futureRate = executor.submit(new Callable<Double>() {public Double call() {return exchangeService.getRate(Money.EUR, Money.USD); ←---- 创建一个查询欧元到美元转换汇率的Future}});Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {public Double call() {double priceInEUR = shop.getPrice(product); ←---- 在第二个Future中查询指定商店中特定商品的价格return priceInEUR * futureRate.get(); ←---- 在查找价格操作的同一个Future中,将价格和汇率做乘法计算出汇后价格}});
在代码清单16-18中,你通过向执行器提交一个Callable对象的方式创建了第一个Future对象,向外部服务查询欧元和美元之间的转换汇率。紧接着,你创建了第二个Future对象,查询指定商店中特定商品的欧元价格。最终,用与代码清单16-17一样的方式,你在同一个Future中通过查询商店得到的欧元商品价格乘以汇率得到了最终的价格。注意,代码清单16-17中如果使用thenCombineAsync,不使用thenCombine,像代码清单16-18中那样,采用第三个Future单独进行商品价格和汇率的乘法运算,效果是几乎相同的。这两种实现看起来没太大区别,原因是你只对两个Future进行了合并。
16.4.6 高效地使用超时机制
16.2.2节曾提到过,读取采用Future计算结果值时,为了避免线程等待结果返回导致的永久阻塞,设定一个超时机制是个不错的主意。Java 9通过CompletableFuture提供了多个方法,可以更加灵活地设置线程的超时机制。orTimeout在指定的超时到达时,会通过ScheduledThreadExecutor线程结束该CompletableFuture对象,并抛出一个TimeoutException异常,它的返回值是一个新的CompletableFuture对象。凭借这一方法,你可以将你的计算流水线串接起来,发生TimeoutException异常时,反馈一个友好的消息给用户。你可以为代码清单16-17中的Future添加超时机制,如果任务没有在3秒钟之内完成就抛出一个TimeoutException异常,代码如下所示。当然,具体超时的时间长短应该与你的业务需求保持一致。
代码清单 16-19 为
CompletableFuture添加超时
Future<Double> futurePriceInUSD =CompletableFuture.supplyAsync(() -> shop.getPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),(price, rate) -> price * rate)).orTimeout(3, TimeUnit.SECONDS); ←---- 如果任务无法在3秒钟之内执行完毕,Future就抛出一个TimeoutException超时异常。Java 9添加了对异步超时管理的支持
有时,如果服务偶然性地无法及时响应,临时使用默认值继续执行也是一种可接受的解决方案。代码清单16-19中,你期望汇率服务1秒钟之内就能返回欧元到美元的兑换汇率。不过,即便请求耗时更长,你也不希望程序直接抛出一个异常,让之前的计算开销付之东流。这种情况下,你希望程序可以退化为使用预先定义的汇率。通过Java 9新引入的completeOnTimeout方法,你可以轻松地完成这一任务,为程序添加第二种超时机制,代码如下所示。
代码清单 16-20 超时之后,采用默认值继续执行
CompletableFuture
Future<Double> futurePriceInUSD =CompletableFuture.supplyAsync(() -> shop.getPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)).completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS), ←---- 如果汇率服务1秒钟还未返回结果,就使用默认汇率继续执行及计算(price, rate) -> price * rate)).orTimeout(3, TimeUnit.SECONDS);
同orTimeout方法一样,completeOnTimeOut方法也返回一个CompletableFuture,你可以将它与其他的CompletableFuture方法链接起来。简短地回顾一下,目前我们已经能配置两种类型的超时:一种是如果程序执行超时,譬如超过3秒,整个计算都会失败;另一种是如果程序执行超时,譬如超过1秒,还可以使用预定义的默认值继续执行,不会发生失效。
现在,你几乎已经完成了你的“最优价格查询器”应用,然而它还有一点儿欠缺。你希望达到的效果是,一旦拿到商店的价格数据,立刻将它们展示给你的用户(这是汽车保险和机票比价网站常用的做法),而不是像你目前的代码那样,要等到获取了所有数据后才开始展示数据。CompletableFuture自身执行完毕之前,调用它的get或者join方法,执行都会被阻塞。接下来的一节会学习如何通过响应CompletableFuture的completion事件达到及时展示数据这一目标,不再受制于get或者join方法。
