9.7 多个CompletableFuture之间的协调(第2部分)

问题

用户希望通过更复杂的示例了解如何协调多个 CompletableFuture 实例。

方案

在美国职棒大联盟(MLB)赛季的每个比赛日访问官方网站,其中包括指向当日比赛的链接。下载每场比赛的技术统计信息(box score),并将其转换为一个 Java 类。采用异步方式保存数据后计算每场比赛的结果,找出总分最高的比赛,然后打印最高分以及出现最高分的那场比赛。

讨论

较之其他简单的示例,本范例所讨论的应用程序更为复杂。希望读者能从中受到启发,理解如何通过合并多个 CompletableFuture 任务来完成工作。

MLB 官方网站保存了指定比赛日中每场比赛的得分,我们的应用程序即以此为基础。16 以 2017 年 6 月 14 日为例,包括当日所有比赛信息的页面如图 9-1 所示。

16只要了解以下内容,理解本例就不会存在障碍:在棒球比赛中,两队轮流攻守,得分较高的队胜出,比赛的统计数据称为技术统计。

9.7 多个CompletableFuture之间的协调(第2部分) - 图1

图 9-1:2017 年 6 月 14 日进行的比赛

在上述页面中,每个链接指向一场比赛。链接以字母 gid 开头,后跟年、月、日以及主队和客队代码。点击某个链接,跳转后的页面包含一个文件列表,其中有一个名为 boxscore.json 的文件。

我们的应用程序将完成以下任务。

(1) 访问提供指定日期范围内各场比赛信息的网站;

(2) 确定每个页面的比赛链接;

(3)下载每场比赛的 boxscore.json 文件;

(4)将 boxscore.json 文件转换为相应的 Java 对象;

(5) 将下载结果保存到本地文件;

(6) 计算每场比赛的得分;

(7) 检索总分最高的比赛;

(8) 打印所有比赛的得分,以及出现最高得分的比赛及其得分。

可以将大部分任务安排为并发执行,不少任务都能以并行方式运行。

受篇幅所限,无法将完整的程序代码复制到书中,不过读者可以从 GitHub 下载 17。本范例将重点讨论并行流和 CompletableFuture 的应用。

17链接如下:https://github.com/kousen/cfboxscores

第一个难点在于如何找出给定范围内每个比赛日的比赛链接。如例 9-28 所示,GamePageLinksSupplier 类实现了 Supplier 接口,其作用是生成一个表示比赛链接的字符串列表。

例 9-28 获取某个日期范围内的比赛链接

  1. public class GamePageLinksSupplier implements Supplier<List<String>> {
  2. private static final String BASE =
  3. "http://gd2.mlb.com/components/game/mlb/";
  4. private LocalDate startDate;
  5. private int days;
  6.  
  7. public GamePageLinksSupplier(LocalDate startDate, int days) {
  8. this.startDate = startDate;
  9. this.days = days;
  10. }
  11.  
  12. public List<String> getGamePageLinks(LocalDate localDate) {
  13. // 使用Jsoup库解析HTML网页,并提取以"gid"开头的链接
  14. }
  15.  
  16. @Override
  17. public List<String> get() {
  18. return Stream.iterate(startDate, d -> d.plusDays(1))
  19. .limit(days)
  20. .map(this::getGamePageLinks)
  21. .flatMap(list -> list.isEmpty() ? Stream.empty() : list.stream())
  22. .collect(Collectors.toList());
  23. }
  24. }

Supplier> 所需的方法

get 方法使用 Stream.iterate 方法对某个范围内的日期进行迭代:从给定日期开始,逐天递增直至上限。

9.7 多个CompletableFuture之间的协调(第2部分) - 图2 Java 9 为 LocalDate 类引入了 datesUntil 方法,它将生成 Stream。相关讨论请参见范例 10.7。

每个 LocalDate 都成为 getGamePageLinks 方法的参数,它使用 Jsoup 库解析 HTML 页面,并查找所有以 gid 开头的链接,然后以字符串的形式返回这些链接。

接下来,程序通过实现 Function 接口的 BoxscoreRetriever 类来访问每个比赛链接中的 boxscore.json 文件,如例 9-29 所示。

例 9-29 在比赛链接列表中检索技术统计列表

  1. public class BoxscoreRetriever implements Function<List<String>, List<Result>> {
  2. private static final String BASE =
  3. "http://gd2.mlb.com/components/game/mlb/";
  4.  
  5. private OkHttpClient client = new OkHttpClient();
  6. private Gson gson = new Gson();
  7. @SuppressWarnings("ConstantConditions")
  8. public Optional<Result> gamePattern2Result(String pattern) {
  9. // 省略代码
  10. String boxscoreUrl = BASE + dateUrl + pattern + "boxscore.json";
  11.  
  12. // 设置OkHttp库以创建网络调用
  13. try {
  14. // 获取响应
  15. if (!response.isSuccessful()) {
  16. System.out.println("Box score not found for " + boxscoreUrl);
  17. return Optional.empty();
  18. }
  19.  
  20. return Optional.ofNullable(
  21. gson.fromJson(response.body().charStream(), Result.class));
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. return Optional.empty();
  25. }
  26. }
  27.  
  28. @Override
  29. public List<Result> apply(List<String> strings) {
  30. return strings.parallelStream()
  31. .map(this::gamePattern2Result)
  32. .filter(Optional::isPresent)
  33. .map(Optional::get)
  34. .collect(Collectors.toList());
  35. }
  36. }

❶ 如果由于降雨或其他因素而未能找到技术统计信息,则返回空 Optional

❷ 利用 Gson 库将 JSON 转换为 Result

BoxscoreRetriever 类需要使用 OkHttp 库和 Gson 库以下载 JSON 格式的技术统计信息,并将其转换为 Result 类型的对象。由于 BoxscoreRetriever 类实现了 Function 接口,可以实现 apply 方法,从而将字符串列表转换为结果列表。如果给定的比赛由于降雨而取消,或因为某些原因导致网络连接中断,则可能找不到该场比赛的技术统计。这种情况下,gamePattern2Result 方法将返回一个为空的 Optional

apply 方法读取各场比赛的链接,将它们转换为相应的 Optional。接下来,apply 方法对流进行筛选,仅传递非空的 Optional 实例,然后在这些 Optional 实例上调用 get 方法,最后将它们收集到结果列表。

9.7 多个CompletableFuture之间的协调(第2部分) - 图3 Java 9 为 Optional 类引入了 stream 方法,它可以将 filter(Optional::isPresent)map(Optional::get) 简化为 flatMap(Optional::stream)。相关讨论请参见范例 10.6。

检索到技术统计信息后可以将其保存为本地文件,如例 9-30 所示。

例 9-30 将每场比赛的技术统计信息保存为文件

  1. private void saveResultList(List<Result> results) {
  2. results.parallelStream().forEach(this::saveResultToFile);
  3. }
  4.  
  5. public void saveResultToFile(Result result) {
  6. // 根据比赛日期和队名确定文件名
  7. try {
  8. File file = new File(dir + "/" + fileName);
  9. Files.write(file.toPath().toAbsolutePath(),
  10. gson.toJson(result).getBytes());
  11. } catch (IOException e) {
  12. e.printStackTrace();
  13. }
  14. }

➊ 创建或覆盖文件,然后将其关闭

如果文件不存在,Files.write 方法(使用默认参数)将创建一个新文件,否则覆盖原有文件。创建或覆盖文件后将其关闭。

程序还使用其他两种后期处理方法:getMaxScore 用于确定某场给定比赛的最高总分,而 getMaxGame 将返回出现最高分的那场比赛。两种方法的应用如例 9-31 所示。

例 9-31 获取最高总分以及出现最高分的比赛

  1. private int getTotalScore(Result result) {
  2. // 两队得分之和
  3. }
  4.  
  5. public OptionalInt getMaxScore(List<Result> results) {
  6. return results.stream()
  7. .mapToInt(this::getTotalScore)
  8. .max();
  9. }
  10.  
  11. public Optional<Result> getMaxGame(List<Result> results) {
  12. return results.stream()
  13. .max(Comparator.comparingInt(this::getTotalScore));
  14. }

最后,通过 CompletableFuture 将前面讨论的所有方法与类合并在一起。主程序代码如例 9-32 所示。

例 9-32 主程序代码

  1. public void printGames(LocalDate startDate, int days) {
  2. CompletableFuture<List<Result>> future =
  3. CompletableFuture.supplyAsync(
  4. new GamePageLinksSupplier(startDate, days))
  5. .thenApply(new BoxscoreRetriever());
  6.  
  7. CompletableFuture<Void> futureWrite =
  8. future.thenAcceptAsync(this::saveResultList)
  9. .exceptionally(ex -> {
  10. System.err.println(ex.getMessage());
  11. return null;
  12. });
  13.  
  14. CompletableFuture<OptionalInt> futureMaxScore =
  15. future.thenApplyAsync(this::getMaxScore);
  16. CompletableFuture<Optional<Result>> futureMaxGame =
  17. future.thenApplyAsync(this::getMaxGame);
  18. CompletableFuture<String> futureMax =
  19. futureMaxScore.thenCombineAsync(futureMaxGame,
  20. (score, result) ->
  21. String.format("Highest score: %d, Max Game: %s",
  22. score.orElse(0), result.orElse(null)));
  23.  
  24. CompletableFuture.allOf(futureWrite, futureMax).join();
  25.  
  26. future.join().forEach(System.out::println);
  27. System.out.println(futureMax.join());
  28. }

❶ 检索技术统计信息的协调任务

❷ 保存为文件,如果出现问题则异常完成

❸ 合并最高总分与出现最高分的比赛这两个任务

❹ 完成所有任务

可以看到,程序创建了多个 CompletableFuture 实例。第一个 CompletableFuture 实例使用 GamePageLinksSupplier 类检索指定日期内所有比赛的页面链接,然后通过 BoxscoreRetriever 类将这些链接转换为结果。第二个 CompletableFuture 实例设置将结果写入磁盘,如果出现问题则异常完成。两个后期处理方法 getMaxScoregetMaxGame 分别用于查找最高总分以及出现最高分的那场比赛,18 而 allOf 方法用于完成所有任务。最后,程序打印相应的结果。

18这两项操作显然可以一起完成,程序将二者分开是为了展示 thenCombine 的应用。

注意 thenApplyAsync 方法的应用。该方法并非必需,但能使任务以异步方式运行。

如果需要检索 2017 年 5 月 5 日到 5 月 7 日三天的技术统计信息,请使用以下语句:

  1. GamePageParser parser = new GamePageParser();
  2. parser.printGames(LocalDate.of(2017, Month.MAY, 5), 3);

输出结果如下:

  1. Box score not found for Los Angeles at San Diego on May 5, 2017
  2. May 5, 2017: Arizona Diamondbacks 6, Colorado Rockies 3
  3. May 5, 2017: Boston Red Sox 3, Minnesota Twins 4
  4. May 5, 2017: Chicago White Sox 2, Baltimore Orioles 4
  5. // 更多数据
  6. May 7, 2017: Toronto Blue Jays 2, Tampa Bay Rays 1
  7. May 7, 2017: Washington Nationals 5, Philadelphia Phillies 6
  8. Highest score: 23, Max Game: May 7, 2017: Boston Red Sox 17, Minnesota Twins 6

希望本范例能对读者有所启发,通过综合运用本书介绍的各种知识,包括 Future 任务(使用 CompletableFuture)、函数式接口(如 SupplierFunction)、类(如 OptionalStreamLocalDate)以及方法(如 mapfilterflatMap),掌握如何解决复杂而有趣的问题。

另见

有关多个 CompletableFuture 之间的协调请参见范例 9.6。