9.6 多个CompletableFuture之间的协调(第1部分)
问题
用户希望在一个 Future 完成之后触发另一个动作(action)。
方案
使用 CompletableFuture 类中用于协调动作的各种实例方法,如 thenApply、thenCompose、thenRun 等。
讨论
CompletableFuture 类的最大优势在于能很容易地将多个 Future 链接起来。我们可以创建多个 Future 以表示需要执行的各种任务,然后通过一个 Future 的完成来触发另一个 Future 的执行,达到协调动作的目的。
我们考虑如何实现下面这个简单的任务:
- 向
Supplier请求一个包含数字的字符串 - 将数字解析为整数
- 将整数倍增
- 打印倍增后的结果
相关实现如例 9-23 所示,程序并不复杂。
例 9-23 利用
CompletableFuture协调多个任务
- private String sleepThenReturnString() {
- try {
- Thread.sleep(100); ➊
- } catch (InterruptedException ignored) {
- }
- return "42";
- }
- CompletableFuture.supplyAsync(this::sleepThenReturnString)
- .thenApply(Integer::parseInt) ➋
- .thenApply(x -> 2 * x) ➋
- .thenAccept(System.out::println) ➌
- .join(); ➍
- System.out.println("Running...");
❶ 人为引入延迟
❷ 在前一阶段完成后应用 Function
❸ 在前一阶段完成后应用 Consumer
❹ 检索完成的结果
由于对 join 的调用属于阻塞调用,执行上述程序将输出 84 并后跟“Running…”。supplyAsync 方法传入 Supplier(本例为 String 类型)。thenApply 方法传入 Function,其输入参数为前一个 CompletionStage 的结果。其中第一个 thenApply 方法中的函数将字符串转换为一个整数,第二个 thenApply 方法中的函数将整数倍增。最后,thenAccept 方法传入 Consumer,在前一个阶段完成后执行。
CompletableFuture 类定义了多种不同的协调方法,完整列表(不包括重载形式,将在之后讨论)如表 9-1 所示。
表9-1:CompletableFuture类定义的协调方法
| 修饰符 | 返回类型 | 方法名 | 参数 |
|---|---|---|---|
CompletableFuture
|
acceptEither
|
CompletionStage other, Consumer action
| |
static
|
CompletableFuture
|
allOf
|
CompletableFuture… cfs
|
static
|
CompletableFuture
|
anyOf
|
CompletableFuture… cfs
|
|
CompletableFuture
|
applyToEither
|
CompletionStage other, Function fn
|
CompletableFuture
|
runAfterBoth
|
CompletionStage other, Runnable action
| |
CompletableFuture
|
thenAccept
|
Consumer action
| |
|
CompletableFuture
|
thenApply
|
Function action, ? extends U> fn
|
|
CompletableFuture
|
thenCombine
|
CompletionStage other, BiFunction fn
|
|
CompletableFuture
|
thenCompose
|
Function> fn
|
CompletableFuture
|
thenRun
|
Runnable action
| |
CompletableFuture
|
whenComplete
|
BiConsumer action
|
上表列出的所有方法均使用工作者线程(worker thread)的通用 ForkJoinPool,其大小与处理器数量相等。前面已经讨论过工厂方法 runAsync 和 supplyAsync,二者指定 Runnable 或 Supplier,并返回 CompletableFuture。如上表所示,可以通过链接其他方法(如 thenApply 或 thenCompose)来添加将在前一个任务完成后开始执行的任务。
表 9-1 并未列出每种方法包含的其他两种模式,二者以 Async 结尾,一种传入 Executor,而另一种不传入。以 thenAccept 方法为例,其完整形式如下:
- CompletableFuture<Void> thenAccept(Consumer<? super T> action)
- CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
- CompletableFuture<Void> thenAcceptAsync(
- Consumer<? super T> action, Executor executor)
第一种形式在与原有任务相同的线程中执行其 Consumer 参数;第二种形式将 Consumer 再次提交给线程池;第三种形式提供一个 Executor,用于运行任务而非通用 fork/join 线程池。
是否采用这些方法的
Async形式应视情况而定。尽管异步操作能加快单个任务的执行速度,但引入的开销可能无助于整体速度的提升。
由于 ExecutorService 接口实现了 Executor 接口,我们也可以使用自定义 Executor 而不是通用线程池。例 9-24 显示了利用单独的线程池执行 CompletableFuture 任务。
例 9-24 在单独的线程池中运行
CompletableFuture任务
ExecutorService service = Executors.newFixedThreadPool(4);CompletableFuture.supplyAsync(this::sleepThenReturnString, service) ➊.thenApply(Integer::parseInt).thenApply(x -> 2 * x).thenAccept(System.out::println).join();System.out.println("Running...");
➊ 提供单独的线程池作为参数
在本例中,thenApply 和 thenAccept 方法使用的线程与 supplyAsync 方法相同。不过,如果使用 thenApplyAsync 方法,那么除非添加另一个线程池作为附加参数,否则任务将被提交到线程池。
在通用 ForkJoinPool 中等待完成
默认情况下,CompletableFuture 使用所谓的“通用” fork/join 线程池,后者是一种经过优化并执行工作窃取(work stealing)的线程池。根据 Javadoc 的描述,这种线程池中的所有线程“尝试查找并执行提交到线程池或由其他活动任务创建的任务”。需要注意的是,所有工作者线程均为守护线程(daemon thread):如果在线程结束前退出程序,线程将终止。
换言之,在执行例 9-23 所示的代码时,如果没有调用
join方法,程序将只打印“Running…”,而不会输出Future的结果,系统在任务完成前即告终止。可以采用两种方案解决这个问题。一种方案是调用
get或join方法,二者将阻塞调用进程,直至检索到结果。另一种方案是为通用线程池设置一个超时时间(time-out period),告诉程序等待直至所有线程执行完毕:
- ForkJoinPool.commonPool().awaitQuiescence(long timeout, TimeUnit unit)
如果将线程池的等待周期设置得足够长,
Future就会完成。awaitQuiescence方法用于通知系统等待,直至所有工作者线程空闲,或所设置的超时时间结束(以先到者为准)。
对于返回值的 CompletableFuture 实例,可以通过 get 或 join 方法检索该值。两种方法属于阻塞调用,直至 Future 完成或抛出异常。不同之处在于,get 方法抛出的是受检异常 ExecutionException,而 join 方法抛出的是非受检异常 CompletionException,因此在 lambda 表达式中更容易使用 join 方法。
cancel 方法用于取消 CompletableFuture,该方法传入一个布尔值作为参数:
- boolean cancel(boolean mayInterruptIfRunning)
如果 CompletableFuture 尚未完成,cancel 方法将利用 CancellationException 使其完成,所有依赖的 CompletableFuture 也会由于 CancellationException 引发的 CompletionException 而异常完成。此时,布尔参数不执行任何操作。14
14有趣的是,根据 Javadoc 的描述,参数 mayInterruptIfRunning“不起作用,因为中断不用于控制处理”。
之前的示例介绍了 thenApply 和 thenAccept 方法的应用。而 thenCompose 是一种实例方法,可以将某个 Future 链接到原有 Future,使得第一个 Future 的结果也能用于第二个 Future。thenCompose 方法的应用如例 9-25 所示,这可能是实现两个数相加的最复杂的程序了。
例 9-25 两个
Future的复合
- @Test
- public void compose() throws Exception {
- int x = 2;
- int y = 3;
- CompletableFuture<Integer> completableFuture =
- CompletableFuture.supplyAsync(() -> x)
- .thenCompose(n -> CompletableFuture.supplyAsync(() -> n + y));
assertTrue(5 == completableFuture.get());}
thenCompose 方法的参数是一个函数,它传入第一个 Future 的结果,并将其转换为第二个 Future 的输出。不过,如果希望两个 Future 彼此独立,可以改用 thenCombine 方法,如例 9-26 所示。15
15好吧,这同样可能是实现两个数字相加的最复杂的程序。
例 9-26 两个
Future的合并
- @Test
- public void combine() throws Exception {
- int x = 2;
- int y = 3;
- CompletableFuture<Integer> completableFuture =
- CompletableFuture.supplyAsync(() -> x)
- .thenCombine(CompletableFuture.supplyAsync(() -> y),
- (n1, n2) -> n1 + n2);
assertTrue(5 == completableFuture.get());}
thenCombine 方法传入 Future 和 BiFunction 作为参数。计算结果时,两个 Future 的结果都能在函数中使用。
CompletableFuture 类还定义了一种名为 handle 的方法,其签名如下:
- <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
BiFunction 的两个输入参数为 Future 的结果(正常完成)和抛出的异常(异常结束),返回的参数由程序决定。handle 方法也有两种 Async 模式,一种传入 BiFunction,另一种传入 BiFunction 和 Executor。handle 方法的应用如例 9-27 所示。
例 9-27
handle方法的应用
- private CompletableFuture<Integer> getIntegerCompletableFuture(String num) {
- return CompletableFuture.supplyAsync(() -> Integer.parseInt(num))
- .handle((val, exc) -> val != null ? val : 0);
- }
- @Test
- public void handleWithException() throws Exception {
- String num = "abc";
- CompletableFuture<Integer> value = getIntegerCompletableFuture(num);
- assertTrue(value.get() == 0);
- }
- @Test
- public void handleWithoutException() throws Exception {
- String num = "42";
- CompletableFuture<Integer> value = getIntegerCompletableFuture(num);
- assertTrue(value.get() == 42);
- }
本例解析字符串以查找相应的整数。解析成功则返回整数,解析失败则抛出 ParseException,handle 方法返回 0。上述两个测试表明,handle 方法能正确处理这两种情况。
从上面这些示例不难看到,可以通过多种方式在通用线程池或自定义执行器中同步或异步地合并多个任务。范例 9.7 将给出一个更复杂的示例,讨论如何协调多个 CompletableFuture。
另见
有关如何协调多个 CompletableFuture,范例 9.7 给出了一个更复杂的示例。
是否采用这些方法的 