9.7 多个CompletableFuture之间的协调(第2部分)
问题
用户希望通过更复杂的示例了解如何协调多个 CompletableFuture 实例。
方案
在美国职棒大联盟(MLB)赛季的每个比赛日访问官方网站,其中包括指向当日比赛的链接。下载每场比赛的技术统计信息(box score),并将其转换为一个 Java 类。采用异步方式保存数据后计算每场比赛的结果,找出总分最高的比赛,然后打印最高分以及出现最高分的那场比赛。
讨论
较之其他简单的示例,本范例所讨论的应用程序更为复杂。希望读者能从中受到启发,理解如何通过合并多个 CompletableFuture 任务来完成工作。
MLB 官方网站保存了指定比赛日中每场比赛的得分,我们的应用程序即以此为基础。16 以 2017 年 6 月 14 日为例,包括当日所有比赛信息的页面如图 9-1 所示。
16只要了解以下内容,理解本例就不会存在障碍:在棒球比赛中,两队轮流攻守,得分较高的队胜出,比赛的统计数据称为技术统计。

图 9-1:2017 年 6 月 14 日进行的比赛
在上述页面中,每个链接指向一场比赛。链接以字母 gid 开头,后跟年、月、日以及主队和客队代码。点击某个链接,跳转后的页面包含一个文件列表,其中有一个名为 boxscore.json 的文件。
我们的应用程序将完成以下任务。
(1) 访问提供指定日期范围内各场比赛信息的网站;
(2) 确定每个页面的比赛链接;
(3)下载每场比赛的 boxscore.json 文件;
(4)将 boxscore.json 文件转换为相应的 Java 对象;
(5) 将下载结果保存到本地文件;
(6) 计算每场比赛的得分;
(7) 检索总分最高的比赛;
(8) 打印所有比赛的得分,以及出现最高得分的比赛及其得分。
可以将大部分任务安排为并发执行,不少任务都能以并行方式运行。
受篇幅所限,无法将完整的程序代码复制到书中,不过读者可以从 GitHub 下载 17。本范例将重点讨论并行流和 CompletableFuture 的应用。
17链接如下:https://github.com/kousen/cfboxscores。
第一个难点在于如何找出给定范围内每个比赛日的比赛链接。如例 9-28 所示,GamePageLinksSupplier 类实现了 Supplier 接口,其作用是生成一个表示比赛链接的字符串列表。
例 9-28 获取某个日期范围内的比赛链接
- public class GamePageLinksSupplier implements Supplier<List<String>> {
- private static final String BASE =
- "http://gd2.mlb.com/components/game/mlb/";
- private LocalDate startDate;
- private int days;
- public GamePageLinksSupplier(LocalDate startDate, int days) {
- this.startDate = startDate;
- this.days = days;
- }
- public List<String> getGamePageLinks(LocalDate localDate) {
- // 使用Jsoup库解析HTML网页,并提取以"gid"开头的链接
- }
- @Override
- public List<String> get() { ➊
- return Stream.iterate(startDate, d -> d.plusDays(1))
- .limit(days)
- .map(this::getGamePageLinks)
- .flatMap(list -> list.isEmpty() ? Stream.empty() : list.stream())
- .collect(Collectors.toList());
- }
- }
➊ Supplier 所需的方法>
get 方法使用 Stream.iterate 方法对某个范围内的日期进行迭代:从给定日期开始,逐天递增直至上限。
Java 9 为
LocalDate类引入了datesUntil方法,它将生成Stream。相关讨论请参见范例 10.7。
每个 LocalDate 都成为 getGamePageLinks 方法的参数,它使用 Jsoup 库解析 HTML 页面,并查找所有以 gid 开头的链接,然后以字符串的形式返回这些链接。
接下来,程序通过实现 Function 接口的 BoxscoreRetriever 类来访问每个比赛链接中的 boxscore.json 文件,如例 9-29 所示。
例 9-29 在比赛链接列表中检索技术统计列表
- public class BoxscoreRetriever implements Function<List<String>, List<Result>> {
- private static final String BASE =
- "http://gd2.mlb.com/components/game/mlb/";
- private OkHttpClient client = new OkHttpClient();
- private Gson gson = new Gson();
- @SuppressWarnings("ConstantConditions")
- public Optional<Result> gamePattern2Result(String pattern) {
- // 省略代码
- String boxscoreUrl = BASE + dateUrl + pattern + "boxscore.json";
- // 设置OkHttp库以创建网络调用
- try {
- // 获取响应
- if (!response.isSuccessful()) {
- System.out.println("Box score not found for " + boxscoreUrl);
- return Optional.empty(); ➊
- }
- return Optional.ofNullable(
- gson.fromJson(response.body().charStream(), Result.class)); ➋
- } catch (IOException e) {
- e.printStackTrace();
- return Optional.empty(); ➊
- }
- }
- @Override
- public List<Result> apply(List<String> strings) {
- return strings.parallelStream()
- .map(this::gamePattern2Result)
- .filter(Optional::isPresent)
- .map(Optional::get)
- .collect(Collectors.toList());
- }
- }
❶ 如果由于降雨或其他因素而未能找到技术统计信息,则返回空 Optional
❷ 利用 Gson 库将 JSON 转换为 Result
BoxscoreRetriever 类需要使用 OkHttp 库和 Gson 库以下载 JSON 格式的技术统计信息,并将其转换为 Result 类型的对象。由于 BoxscoreRetriever 类实现了 Function 接口,可以实现 apply 方法,从而将字符串列表转换为结果列表。如果给定的比赛由于降雨而取消,或因为某些原因导致网络连接中断,则可能找不到该场比赛的技术统计。这种情况下,gamePattern2Result 方法将返回一个为空的 Optional。
apply 方法读取各场比赛的链接,将它们转换为相应的 Optional。接下来,apply 方法对流进行筛选,仅传递非空的 Optional 实例,然后在这些 Optional 实例上调用 get 方法,最后将它们收集到结果列表。
Java 9 为
Optional类引入了stream方法,它可以将filter(Optional::isPresent)和map(Optional::get)简化为flatMap(Optional::stream)。相关讨论请参见范例 10.6。
检索到技术统计信息后可以将其保存为本地文件,如例 9-30 所示。
例 9-30 将每场比赛的技术统计信息保存为文件
- private void saveResultList(List<Result> results) {
- results.parallelStream().forEach(this::saveResultToFile);
- }
- public void saveResultToFile(Result result) {
- // 根据比赛日期和队名确定文件名
- try {
- File file = new File(dir + "/" + fileName);
- Files.write(file.toPath().toAbsolutePath(), ➊
- gson.toJson(result).getBytes());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
➊ 创建或覆盖文件,然后将其关闭
如果文件不存在,Files.write 方法(使用默认参数)将创建一个新文件,否则覆盖原有文件。创建或覆盖文件后将其关闭。
程序还使用其他两种后期处理方法:getMaxScore 用于确定某场给定比赛的最高总分,而 getMaxGame 将返回出现最高分的那场比赛。两种方法的应用如例 9-31 所示。
例 9-31 获取最高总分以及出现最高分的比赛
- private int getTotalScore(Result result) {
- // 两队得分之和
- }
- public OptionalInt getMaxScore(List<Result> results) {
- return results.stream()
- .mapToInt(this::getTotalScore)
- .max();
- }
- public Optional<Result> getMaxGame(List<Result> results) {
- return results.stream()
- .max(Comparator.comparingInt(this::getTotalScore));
- }
最后,通过 CompletableFuture 将前面讨论的所有方法与类合并在一起。主程序代码如例 9-32 所示。
例 9-32 主程序代码
- public void printGames(LocalDate startDate, int days) {
- CompletableFuture<List<Result>> future =
- CompletableFuture.supplyAsync(
- new GamePageLinksSupplier(startDate, days))
- .thenApply(new BoxscoreRetriever()); ➊
- CompletableFuture<Void> futureWrite =
- future.thenAcceptAsync(this::saveResultList) ➋
- .exceptionally(ex -> {
- System.err.println(ex.getMessage());
- return null;
- });
- CompletableFuture<OptionalInt> futureMaxScore =
- future.thenApplyAsync(this::getMaxScore);
- CompletableFuture<Optional<Result>> futureMaxGame =
- future.thenApplyAsync(this::getMaxGame);
- CompletableFuture<String> futureMax =
- futureMaxScore.thenCombineAsync(futureMaxGame, ➌
- (score, result) ->
- String.format("Highest score: %d, Max Game: %s",
- score.orElse(0), result.orElse(null)));
- CompletableFuture.allOf(futureWrite, futureMax).join(); ➍
- future.join().forEach(System.out::println);
- System.out.println(futureMax.join());
- }
❶ 检索技术统计信息的协调任务
❷ 保存为文件,如果出现问题则异常完成
❸ 合并最高总分与出现最高分的比赛这两个任务
❹ 完成所有任务
可以看到,程序创建了多个 CompletableFuture 实例。第一个 CompletableFuture 实例使用 GamePageLinksSupplier 类检索指定日期内所有比赛的页面链接,然后通过 BoxscoreRetriever 类将这些链接转换为结果。第二个 CompletableFuture 实例设置将结果写入磁盘,如果出现问题则异常完成。两个后期处理方法 getMaxScore 和 getMaxGame 分别用于查找最高总分以及出现最高分的那场比赛,18 而 allOf 方法用于完成所有任务。最后,程序打印相应的结果。
18这两项操作显然可以一起完成,程序将二者分开是为了展示 thenCombine 的应用。
注意 thenApplyAsync 方法的应用。该方法并非必需,但能使任务以异步方式运行。
如果需要检索 2017 年 5 月 5 日到 5 月 7 日三天的技术统计信息,请使用以下语句:
- GamePageParser parser = new GamePageParser();
- parser.printGames(LocalDate.of(2017, Month.MAY, 5), 3);
输出结果如下:
Box score not found for Los Angeles at San Diego on May 5, 2017May 5, 2017: Arizona Diamondbacks 6, Colorado Rockies 3May 5, 2017: Boston Red Sox 3, Minnesota Twins 4May 5, 2017: Chicago White Sox 2, Baltimore Orioles 4// 更多数据May 7, 2017: Toronto Blue Jays 2, Tampa Bay Rays 1May 7, 2017: Washington Nationals 5, Philadelphia Phillies 6Highest score: 23, Max Game: May 7, 2017: Boston Red Sox 17, Minnesota Twins 6
希望本范例能对读者有所启发,通过综合运用本书介绍的各种知识,包括 Future 任务(使用 CompletableFuture)、函数式接口(如 Supplier 和 Function)、类(如 Optional、Stream 与 LocalDate)以及方法(如 map、filter 与 flatMap),掌握如何解决复杂而有趣的问题。
另见
有关多个 CompletableFuture 之间的协调请参见范例 9.6。
