17.2 反应式流以及Flow API

反应式编程是一种利用反应式流的编程技术。而反应式流是以异步方式处理潜在无边界数据流的标准技术(它基于“发布–订阅”模型,也叫pub-sub,更详细的介绍请参考第15章的内容),其处理时按先后次序进行,并且带有不可阻塞的背压。背压是发表–订阅模式下的一种常见的流量控制机制,目的是避免流中事件处理的消费者由于处理速度较慢,被一个或多个快速的生产者压垮。出现这种情况时,如果受压组件发生灾难式的崩溃,或者以无法控制的方式丢弃事件都是不可接受的。组件需要一种方式来向上游生产者反馈,让它们减缓生产速度,或者告诉生产者它在接收更多数据之前,在给定的时间内能够接受和处理多少事件。

值得一提的是背压的这些内置要求是由流处理天然的异步特质决定的。实际上,执行同步调用时,系统默认就会收到来自阻塞API的背压。遭遇这种不幸的场景时,你将无法执行任何任务,直到阻塞操作完成,因此,由于等待你会浪费大量的资源。与之相反,使用异步式API,硬件资源的使用率能够大幅提高,甚至达到其极限,不过你可能由此压垮下游处理速度较慢的组件。引入背压或者流量控制机制的目的就是解决这一问题,它们提供了一种协议,可以在不阻塞线程的情况下,避免数据接收方被压垮。

反应式的这些需求和行为都汇集浓缩到了反应式流(Reactive Streams)1项目中。这个项目的成员来自于奈飞(Netflix)、红帽(Red Hat)、Twitter、Lightbend等公司。依据这些需求和行为,反应式流项目定义了实现任何反应式流都必须提供的四个相互关联的接口。这些接口现在是Java 9语言的组成部分,由新的java.util.concurrent.Flow类提供。很多第三方库,包括Akka流(Lightbend公司)、Reactor(Pivotal公司),以及Vert.x(红帽公司)都提供了这些接口的实现。接下来的一节会介绍这些接口声明方法的细节,并讨论如何在反应式组件中使用它们。

1书中我们使用了大写开头的Reactive Streams,解释概念时则可以采用小写的reactive streams。

17.2.1 Flow

Java 9为了支持反应式编程新增了一个类:java.util.concurrent.Flow。这个类只包含一个静态组件,无法实例化。Flow类包含了四个嵌套的接口来体现反应式项目定义的标准“发布–订阅”模型,分别是:

  • 发布者(Publisher);
  • 订阅者(Subscriber);
  • 订阅(Subscription);
  • 处理者(Processor)。

凭借Flow类,相互关联的接口或者静态方法可以构造流控(flow-controlled)组件。Publisher生产的元素可以被一个或多个Subscriber消费,PublisherSubscriber之间的关系通过Subscription管理。Publisher是顺序事件的提供者,并且这些事件的数量可能没有上限,不过它也受背压机制的制约,按照Subscriber的反馈进行元素的生产。Publisher是一个Java函数式接口(它仅仅声明了一个抽象方法),Subscriber可以把自己注册为该事件的监听方从而完成对Publisher事件的注册。流量控制,包括PublisherSubscriber之间的背压都是由Subscription管理的。这三个接口以及Processor接口的定义可以参考代码清单17-1、代码清单17-2、代码清单17-3以及代码清单17-4。

代码清单 17-1 Flow.Publisher接口

  1. @FunctionalInterface
  2. public interface Publisher<T> {
  3. void subscribe(Subscriber<? super T> s);
  4. }

此外,Subscriber接口提供了四个回调函数,这些回调函数会在Publisher生产对应事件时被调用。

代码清单 17-2 Flow.Subscriber接口

  1. public interface Subscriber<T> {
  2. void onSubscribe(Subscription s);
  3. void onNext(T t);
  4. void onError(Throwable t);
  5. void onComplete();
  6. }

这些事件的发布(以及对应方法的调用)都必须严格遵守下面协议定义的顺序:

  1. onSubscribe onNext* (onError | onComplete)?

2

2此表示法表示onsubscribe调用。——译者注

这种表示法的含义是onSubscribe方法始终作为第一个事件被调用,接下来是任意多个onNext方法的调用。事件流的处理可能持续不断,也可能借由onComplete回调方法终止,表面接下来没有更多需要处理的元素了,抑或如果Publisher发生了失效,就会执行onError调用(可以对比从终端正常读取一个字符串,或者读取到文件末尾,或者发生I/O错误的情况)。

SubscriberPublisher注册时,Publisher的第一个动作就是调用onSubscribe方法并回传一个Subscription对象。Subscription接口定义了两个方法。Subscriber可以使用第一个方法通知Publisher它已经准备好接收多少个事件,第二个方法用于取消Subscription,因此它的作用就是告诉Publisher它已经不再希望接收来自Publisher的事件了。

代码清单 17-3 Flow.Subscription接口

  1. public interface Subscription {
  2. void request(long n);
  3. void cancel();
  4. }

Java 9的Flow规范定义了一系列的规则,通过这些规则,协议的接口之间能相互沟通协调。下面总结了这些规则的内容。

  • Publisher发送给Subscriber的元素数量不能超过其在Subscriptionrequest方法中指定的数目。不过如果SubscriptiononComplete方法成功地终止,或者Subscription执行过程中发生了错误,调用了onError方法,Publisher也可能还没达到设定的数量就停止调用onNextSubscriber发送元素了。发生这种情况,Subscription就变成了终止状态(即onComplete或者onError),Publisher无法再向Subscriber发送任何信号,对应的Subscription只能被看作取消了。
  • Subscriber必须告知Publisher它是否已经准备好接收数据以及能够处理多少元素。凭借这种方式,SubscriberPublisher执行了“背压”操作,有效地避免了Subscriber被超载数据压垮的情况发生。此外,执行onComplete或者onError操作时,Subscriber不能再次调用Publisher或者Subscription中的方法,这个时刻的Subscription已经被取消了。最后,发出Subscription.cancel()调用后,即使还未执行Subscription.request()方法,也没有通过onNext接收到任何消息,Subscriber也要准备好进行终止操作。
  • Subscription只能被一对PublisherSubscriber共享,这代表了它们之间独一无二的关系。基于这个原因,Subscriber可以从onSubscribeonNext方法中以异步方式调用它的request方法。标准还规定了Subscription.cancel()方法的实现必须是幂等(即调用它一次与重复调用多次的效果是同样的)和线程安全的,这样才能保证执行完第一次调用后,任何对Subscription的额外调用都不会有副作用。执行Subscription.cancel()调用后,Publisher会彻底删除对应Subscriber的引用。规则不推荐大家重复订阅同一个Subscriber,但是它并没有强制发生这种情况时抛出异常,因为所有之前取消的Subscription都需要妥善地保存下来。

图17-3展示了一个典型应用的生命周期,它实现了Flow API中定义的接口。

17.2 反应式流以及Flow API - 图1

图 17-3 使用Flow API的反应式应用的生命周期

Flow类的第4个也是最后一个成员是Processor接口。它同时继承了PublisherSubscriber,但没有额外添加新的方法。

代码清单 17-4 Flow.Processor接口

  1. public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }

实际上,这个接口反映的就是反应式流中事件的转化阶段。接收到错误时,Processor可以选择从出错状态恢复(接着需要将该Subscription设置为取消状态),或者直接向Subscriber抛出onError信号。当最后一个Subscriber取消其Subscription时,Processor也应该取消其上游的Subscription以传递该取消信号(尽管规范中并未严格规定此时一定要执行这样的取消操作)。

Java 9的Flow API或者反应式流API规定所有Subscriber接口的方法实现都不得阻塞Publisher,但是它并未指定这些方法一定要采用同步或者异步的方式。然而,请注意一点,这些接口中定义的所有方法都返回void,从而确保它们能以完全异步的方式实现。

接下来的一节会通过一个简单又实用的例子将已经学习到的内容运用起来。

17.2.2 创建你的第一个反应式应用

大多数情况下,不建议直接去实现Flow类中定义的接口。非比寻常地,Java 9库也并未提供实现它们的类。这些接口的实现借由前面提到的反应式库(譬如Akka、RxJava等)完成。Java 9的java.util.concurrency.Flow规范既是所有实现该接口的库需要遵守的合约,也是使构建于不同的反应式库之上的应用间能相互协调、相互理解沟通的通用语言。此外,反应式库一般都提供了更丰富的特性(除了由java.util.concurrency.Flow接口定义的最小功能集外,它们往往提供了更多对反应式流进行转换和归并的类和方法)。

正如前文所述,直接基于Java 9的Flow API创建你的第一个反应式应用对于理解这四个接口之间是如何工作的非常有价值。到本节结束,你将会基于反应式原则创建一个简单的温度汇报程序。这个程序包含两个组件,分别是:

  • TempInfo,它模拟一个远程温度计(持续不断地回报温度,温度的值是随机生成的,介于华氏0度到99度之间,这也是适合大多数美国城市的温度区间);
  • TempSubscriber,它监听这些温度报告事件,并打印输出某个城市的温度监控器返回的温度Stream。

我们要做的第一步是定义一个简单的类来描述当前汇报的温度,如下面的代码清单所示。

代码清单 17-5 表示当前汇报温度的Java Bean

  1. import java.util.Random;
  2. public class TempInfo {
  3. public static final Random random = new Random();
  4. private final String town;
  5. private final int temp;
  6. public TempInfo(String town, int temp) {
  7. this.town = town;
  8. this.temp = temp;
  9. }
  10. public static TempInfo fetch(String town) { ←---- 城市的TempInfo实例都通过静态工厂方法创建
  11. if (random.nextInt(10) == 0) ←---- 获取当前温度,每十次获取操作可能随机失败一次
  12. throw new RuntimeException("Error!");
  13. return new TempInfo(town, random.nextInt(100)); ←---- 返回温度,其值是介于华氏0度到99度之间的一个随机数
  14. }
  15. @Override
  16. public String toString() {
  17. return town + " : " + temp;
  18. }
  19. public int getTemp() {
  20. return temp;
  21. }
  22. public String getTown() {
  23. return town;
  24. }
  25. }

定义好这个简单的领域模型之后,你就可以开始着手实现某个城市温度的Subscription了,它会在Subscriber请求温度报告时返回对应的数据。下面是实现这段逻辑的代码。

代码清单 17-6 Subscription接口实现,向Subscriber发送TempInfo Stream

  1. import java.util.concurrent.Flow.*;
  2. public class TempSubscription implements Subscription {
  3. private final Subscriber<? super TempInfo> subscriber;
  4. private final String town;
  5. public TempSubscription( Subscriber<? super TempInfo> subscriber,
  6. String town ) {
  7. this.subscriber = subscriber;
  8. this.town = town;
  9. }
  10. @Override
  11. public void request( long n ) {
  12. for (long i = 0L; i < n; i++) { ←---- Subscriber每处理一个请求执行一次循环
  13. try {
  14. subscriber.onNext( TempInfo.fetch( town ) ); ←---- 将当前温度发送给Subscriber
  15. } catch (Exception e) {
  16. subscriber.onError( e ); ←---- 查询温度时如果发送失效,将出错信息返回给Subscriber
  17. break;
  18. }
  19. }
  20. }
  21. @Override
  22. public void cancel() {
  23. subscriber.onComplete(); ←---- 如果Subscription被取消了,那么向Subscriber发送一个完成(onComplete)信号
  24. }
  25. }

接下来一步是创建Subscriber,每当它从Subscription拿到一个新元素,就打印输出温度,并继续请求新的数据,实现代码如下。

代码清单 17-7 Subscriber接口实现,打印输出收到的温度数据

  1. import java.util.concurrent.Flow.*;
  2. public class TempSubscriber implements Subscriber<TempInfo> {
  3. private Subscription subscription;
  4. @Override
  5. public void onSubscribe( Subscription subscription ) { ←---- 保存Subscription并发送第一个请求
  6. this.subscription = subscription;
  7. subscription.request( 1 );
  8. }
  9. @Override
  10. public void onNext( TempInfo tempInfo ) { ←---- 打印输出接收到的温度数据并发送下一个数据请求
  11. System.out.println( tempInfo );
  12. subscription.request( 1 );
  13. }
  14. @Override
  15. public void onError( Throwable t ) { ←---- 发生错误时,打印出错信息
  16. System.err.println(t.getMessage());
  17. }
  18. @Override
  19. public void onComplete() {
  20. System.out.println("Done!");
  21. }
  22. }

接下来的这段代码把之前实现的反应式应用放到了Main类中,它会创建一个Publisher,之后使用TempSubscriber订阅该Publisher的消息。

代码清单 17-8 Main类: 创建Publisher并向其订阅TempSubscriber

  1. import java.util.concurrent.Flow.*;
  2. public class Main {
  3. public static void main( String[] args ) {
  4. getTemperatures( "New York" ).subscribe( new TempSubscriber() ); ←---- 创建一个新的纽约温度的Publisher,并向其订阅TempSubscriber 事件
  5. }
  6. private static Publisher<TempInfo> getTemperatures( String town ) { ←---- 向注册了该事件的Subscriber返回一个发送TempSubscriptionPublisher对象
  7. return subscriber -> subscriber.onSubscribe(
  8. new TempSubscription( subscriber, town ) );
  9. }
  10. }

这段代码中,getTemperatures方法返回的是一个Lambda表达式,它接受一个Subscriber对象作为参数,并调用它的onSubscribe方法。调用onSubscribe方法时,向其传入的参数是一个新创建的TempSubscription实例。由于这个Lambda表达式的签名与Publisher函数式接口中唯一的抽象方法保持一致,因此Java编译器会自动地将该Lambda表达式转换为Publisher对象(更多细节请参考第3章)。main方法为纽约的温度创建了一个Publisher,接着向它注册了一个新的TempSubscriber类实例。执行main函数的输出结果如下:

  1. New York : 44
  2. New York : 68
  3. New York : 95
  4. New York : 30
  5. Error!

上述执行结果中TempSubscription成功地获取了四次纽约的温度,在尝试第5次读取时失败了。看起来通过Flow API提供的四个接口中的三个,你就已经成功地解决了该问题。不过,你确定这段代码没有任何问题么?不用着急回答,你可以再思考一下,完成下面这个测验之后再给出答案。

测验17.1

我们开发的这个程序目前存在一个微妙的缺陷。不过,由于温度数据所构成的Stream会被TempInfo工厂方法随机抛出的异常中断,这个问题被隐藏了。如果注释掉随机生成错误的那段代码,让程序持续运行足够长的时间,你猜猜会发生什么情况?

答案:这段代码的问题在于每次TempSubscriber接受一个新的元素都会调用它的onNext方法,onNext方法又会向TempSubscription发送一个新请求,接着request方法又会向TempSubscriber发送另一个元素。这种递归的调用一个接着一个被压入栈,最终导致栈溢出,造成像下面这样的StackOverflowError错误:

  1. Exception in thread "main" java.lang.StackOverflowError
  2. at java.base/java.io.PrintStream.print(PrintStream.java:666)
  3. at java.base/java.io.PrintStream.println(PrintStream.java:820)
  4. at flow.TempSubscriber.onNext(TempSubscriber.java:36)
  5. at flow.TempSubscriber.onNext(TempSubscriber.java:24)
  6. at flow.TempSubscription.request(TempSubscription.java:60)
  7. at flow.TempSubscriber.onNext(TempSubscriber.java:37)
  8. at flow.TempSubscriber.onNext(TempSubscriber.java:24)
  9. at flow.TempSubscription.request(TempSubscription.java:60)

怎样才能修复这个问题,避免发生栈溢出呢?一种可行的解决方案是在TempSubscription中添加Executor,使用它通过另外一个线程向TempSubscriber发送新的元素。为了达到这个目标,你可以像下面的代码清单那样修改TempSubscription。(注意,这个类的实现是不完整的,完整的定义需要结合代码清单17-6剩余的部分。)

代码清单 17-9 为TempSubscription添加Executor

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class TempSubscription implements Subscription { ←---- 为了节省页面,刻意省略了原TempSubscription类中未改动的代码
  4. private static final ExecutorService executor =
  5. Executors.newSingleThreadExecutor();
  6. @Override
  7. public void request( long n ) {
  8. executor.submit( () -> { ←---- 另起一个线程向subscriber发送下一个元素
  9. for (long i = 0L; i < n; i++) {
  10. try {
  11. subscriber.onNext( TempInfo.fetch( town ) );
  12. } catch (Exception e) {
  13. subscriber.onError( e );
  14. break;
  15. }
  16. }
  17. });
  18. }
  19. }

Flow API定义了四个接口,目前为止,你仅使用了其中的三个。那么,什么时候使用Processor接口呢?为了解释这个问题,我们举一个例子,通过它你大概就能理解什么时候采用Processor接口了。譬如你需要创建一个Publisher,用来汇报温度数据,不过你收到了一个额外的要求,这些收集的数据要以摄氏温度而不是华氏温度的方式表示(假设你要收集的城市并不在美国)。这时使用Processor接口就非常适合了。

17.2.3 使用Processor转换数据

17.2.1节曾介绍过,Processor身兼两职,它既是一个Subscriber也是一个Publisher。实际上,我们经常将它注册到一个Publisher上,接收并转换完数据后,再把这些数据重新发布出去。这里我们举一个实际的例子,要求是实现一个Processor,它注册到一个发布以华氏温度表示温度数据的Publisher上,你需要将接收到的数据转换为摄氏温度并重新发布出去。代码清单如下。

代码清单 17-10 将温度由华氏温度转换为摄氏温度的Processor

  1. import java.util.concurrent.Flow.*;
  2. public class TempProcessor implements Processor<TempInfo, TempInfo> { ←---- TempInfo由一种格式转换为另一种格式的processor
  3. private Subscriber<? super TempInfo> subscriber;
  4. @Override
  5. public void subscribe( Subscriber<? super TempInfo> subscriber ) {
  6. this.subscriber = subscriber;
  7. }
  8. @Override
  9. public void onNext( TempInfo temp ) {
  10. subscriber.onNext( new TempInfo( temp.getTown(),
  11. (temp.getTemp() - 32) * 5 / 9) ); ←---- TempInfo转换为摄氏温度后重新发布
  12. }
  13. @Override
  14. public void onSubscribe( Subscription subscription ) {
  15. subscriber.onSubscribe( subscription ); (以下9行)所有其他的信号都原封不动地代理给上游的subscriber处理
  16. }
  17. @Override
  18. public void onError( Throwable throwable ) {
  19. subscriber.onError( throwable );
  20. }
  21. @Override
  22. public void onComplete() {
  23. subscriber.onComplete();
  24. }
  25. }

注意,在上面的代码中,onNextTempProcessor类中唯一一个包含业务逻辑的方法,它在将温度由华氏温度转换为摄氏温度后将其重新发布出去。所有其他实现Subscriber接口的方法都仅仅做了个二传手,把接收到的信号原封不动地传递给上游的SubscriberPublishersubscribe方法将上游的Subscriber注册到Processor中。

下面的这段代码清单在Main类中整合了TempProcessor对象,来看看它是怎样工作的。

代码清单 17-11 Main类: 创建Publisher并向其注册TempSubscriber

  1. import java.util.concurrent.Flow.*;
  2. public class Main {
  3. public static void main( String[] args ) {
  4. getCelsiusTemperatures( "New York" ) ←---- 为纽约创建一个摄氏温度版本的Publisher
  5. .subscribe( new TempSubscriber() ); ←---- TempSubscriber注册到该Publisher
  6. }
  7. public static Publisher<TempInfo> getCelsiusTemperatures(String town) {
  8. return subscriber -> {
  9. TempProcessor processor = new TempProcessor(); ←---- 创建TempProcessor对象,并将其插入Subscriber和返回的Publisher之间
  10. processor.subscribe( subscriber );
  11. processor.onSubscribe( new TempSubscription(processor, town) );
  12. };
  13. }
  14. }

再次执行Main时会生成下面的打印输出,可以看到这次温度都以典型的摄氏温度格式呈现了:

  1. New York : 10
  2. New York : -12
  3. New York : 23
  4. Error!

构成Flow API思想核心的是它基于“发布–订阅”协议的异步流处理模型,本节中,通过直接实现Flow API中定义的接口,我们对这一模型有了比较直观的理解。不过我们使用的示例与日常程序设计中的反应式编程略微有些不同,接下来的一节会讨论这些差异。

17.2.4 为什么Java并未提供Flow API的实现

Java 9的Flow API有点儿让人脑洞大开的意味。通常情况下Java库会同时提供接口和对应的实现给用户使用,然而这次Flow API并没有走寻常路——你需要自己实现Flow API。我们可以拿List API做例子,对比一下二者的不同。你大概很熟悉,Java提供的List接口已经被非常多的类实现了,其中包括ArrayList。更确切地说(这部分内容一般用户可能没那么关心)类ArrayList继承自抽象类AbstractList,而后者实现了LIst接口。与此相反,Java 9声明了Publisher接口,可是没有提供任何实现,这也是你只能定义自己版本实现的原因(当然,实现这些接口也能帮助你更好地学习它们,不过这并非其初衷)。面对现实吧——接口可以帮助你更好地构建你的程序思维,不过它并不能帮你更快地完成程序设计。

那到底是什么原因呢?答案是主要基于历史因素:反应式流有多个Java库的实现版本(譬如Akka和RxJava)。最初这些库都是独立开发的,虽然它们都基于“发布–订阅”的思想实现了反应式编程,但是使用的术语和API是迥异的。在Java 9标准化的过程中,这些库也在不断演进,最终它们都实现了java.util.concurrent.Flow接口,不再是仅仅实现了反应式的概念。标准化使得不同库之间互通和调用成为可能。

构建一个反应式流的实现相当复杂,因此大多数用户都倾向于使用现有的库。大多数实现接口的类库都会提供更加丰富的功能,而不是仅限于接口的最小实现集。

接下来的一节会学习目前市面上使用最广泛的反应式库:RxJava(Java的反应式扩展库),它由Netflix公司的工程师开发。我们会着重介绍RxJava 2.0版本,这也是当前最新的版本,其实现了Java 9的Flow接口。