15.5 “发布–订阅”以及反应式编程
Future和CompletableFuture的思维模式是计算的执行是独立且并发的。使用get()方法可以在执行结束后获取Future对象的执行结果。因此,Future是一个一次性对象,它只能从头到尾执行代码一次。
与此相反,反应式编程的思维模式是类Future的对象随着时间的推移可以产生很多的结果。我们举两个例子。首先,假设你要处理一个温度计对象。你的期望是,温度计对象会持续不断地生成结果,以每隔几秒的频率为你提供温度数据。另一个例子是Web服务器的监听组件对象。该组件监听来自网络的HTTP请求,并根据请求的内容返回相应的数据。接着,其他的代码会对结果数据进行处理,譬如温度或者HTTP请求中的数据。然后温度计和监听对象会继续检测温度、监听请求,直到有新的数据到来,周而复始。
这里有两点需要注意。核心的一点是,这些例子与Future非常像,然而它们可以完成(或产生)多次,而非一次性的操作。另一点是,第二个例子中,之前收到的结果与之后收到的结果可能都重要,而对温度计而言,大多数用户只关心最近的温度。但是,为什么要把这种编程叫作反应式的呢?答案是,程序的另一部分可能需要对低温报告做出反应,比如打开加热器。
你可能会说前述的思想不就是流吗,没什么特别的。如果你的程序能非常自然地适配流,那么流可能就是最合适你的实现。然而,总体来说,反应式编程的模式更具表现力。一个Java流只能由一个终端操作使用。15.3节提到过,流的编程模式让它很难表达一些类似流的操作,譬如将一个序列值进行切分,交由两个流水线(就像fork那样)来处理;或者处理和整合来自两个相互独立的流中的元素(就像join那样)。流支持的都是线性处理的流水线。
Java 9使用java.util.concurrent.Flow提供的接口对反应式编程进行建模,实现了名为“发布–订阅”的模型(也叫协议,简写为pub-sub)。第17章会详细介绍Java 9的Flow API,这里会提供一个简短的概要。反应式编程有三个主要的概念,分别是:
- 订阅者可以订阅的发布者;
- 名为订阅的连接;
- 消息(也叫事件),它们通过连接传输。
图15-9以图像的方式展示了该思想,其中的订阅(subscription)就像是管道,而发布者和订阅者类似于线框上的端口。多个组件可以向同一个发布者订阅,一个组件既可以发布多个相互独立的流,也可以向多个发布者订阅。接下来的一节会使用Java 9 Flow接口的术语逐步向你展示该思想是如何工作的。

图 15-9 “发布–订阅”模型
15.5.1 示例:对两个流求和
“发布–订阅”的一个简单却典型的例子是整合两个信息源的事件并发布给其他用户使用。这个流程一开始听起来可能很模糊,不过概念上我们把它想象成一个电子表格。假设电子表格中的一个单元格包含着公式。我们对电子表格的单元格C3进行建模,该单元格包含了公式“=C1+C2”。只要C1或者C2被更新(无论是有人对它进行了更新,还是因为该表格包含了其他的公式),C3也会更新以反映这些变化。假设下面的代码中,唯一可用的操作就是对单元格的值进行求和。
首先,对保存值的单元格进行建模:
private class SimpleCell {private int value = 0;private String name;public SimpleCell(String name) {this.name = name;}}
这时,代码还比较简单,你可以初始化几个单元格,如下所示:
SimpleCell c2 = new SimpleCell("C2");SimpleCell c1 = new SimpleCell("C1");
怎样才能指定当C1或C2的值发生变化时,C3会对这两个值重新进行求和计算呢?你需要一个途径让C3可以订阅C1和C2的事件。为了达到这一目标,我们引入了接口Publisher,它的核心代码看起来像下面这样:
interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);}
这个接口接受一个它可以通信的订阅者作为参数。Subscriber接口提供了一个简单的方法onNext,它接受信息作为参数,接下来你就可以按照自己的需求进行实现了:
interface Subscriber<T> {void onNext(T t);}
怎样把这两个概念整合到一起呢?你可能意识到了,单元格实际上既是一个发布者(它可以向其他单元格发布自己的事件)也是一个订阅者(需要依据其他单元格的事件进行响应)。Cell类的实现如下所示:
private class SimpleCell implements Publisher<Integer>, Subscriber<Integer> {private int value = 0;private String name;private List<Subscriber> subscribers = new ArrayList<>();public SimpleCell(String name) {this.name = name;}@Overridepublic void subscribe(Subscriber<? super Integer> subscriber) {subscribers.add(subscriber);}private void notifyAllSubscribers() { ←---- 该方法通知所有的订阅者有一个新值产生subscribers.forEach(subscriber -> subscriber.onNext(this.value));}@Overridepublic void onNext(Integer newValue) {this.value = newValue; ←---- 通过更新自己的值来响应它订阅的单元格所发生的新值变化System.out.println(this.name + ":" + this.value); ←---- 在终端窗口打印输出更新的值,此外还可以渲染作为UI一部分的发生变化的单元格notifyAllSubscribers(); ←---- 通知所有的订阅者更新的值}}
尝试几个简单的例子:
Simplecell c3 = new SimpleCell("C3");SimpleCell c2 = new SimpleCell("C2");SimpleCell c1 = new SimpleCell("C1");c1.subscribe(c3);c1.onNext(10); // 更新C1的值为10c2.onNext(20); // 更新C2的值为20
这段代码的输出如下所示,因为C3直接订阅了C1的事件:
C1:10C3:10C2:20
接下来怎么实现“C3=C1+C2”这个行为呢?你需要引入一个单独的类,用来保存算术操作符(左边和右边)两边的值:
public class ArithmeticCell extends SimpleCell {private int left;private int right;public ArithmeticCell(String name) {super(name);}public void setLeft(int left) {this.left = left;onNext(left + this.right); ←---- 更新单元格中的值,并通知所有事件订阅者该变化}public void setRight(int right) {this.right = right;onNext(right + this.left); ←---- 更新单元格中的值,并通知所有事件订阅者该变化}}
现在你可以尝试一个更现实例子:
ArithmeticCell c3 = new ArithmeticCell("C3");SimpleCell c2 = new SimpleCell("C2");SimpleCell c1 = new SimpleCell("C1");c1.subscribe(c3::setLeft);c2.subscribe(c3::setRight);c1.onNext(10); // 更新C1的值为10c2.onNext(20); // 更新C2的值为20c1.onNext(15); // 更新C1的值为15
这段代码的输出如下:
C1:10C3:10C2:20C3:30C1:15C3:35
审视这段输出,你会发现当C1更新为15时,C3会立刻进行响应,也同步更新它的值。发布者–订阅者交互的奇妙之处在于你可以建立发布者与订阅者之间的一幅图。你可以创建另一个单元格C5,通过表达式“C5=C3+C4”,可以指定它依赖于C3和C4,如下所示:
ArithmeticCell c5 = new ArithmeticCell("C5");ArithmeticCell c3 = new ArithmeticCell("C3");SimpleCell c4 = new SimpleCell("C4");SimpleCell c2 = new SimpleCell("C2");SimpleCell c1 = new SimpleCell("C1");c1.subscribe(c3::setLeft);c2.subscribe(c3::setRight);c3.subscribe(c5::setLeft);c4.subscribe(c5::setRight);
之后,你就可以在你的电子表格中进行各种更新了:
c1.onNext(10); // 更新C1的值为10c2.onNext(20); // 更新C2的值为20c1.onNext(15); // 更新C1的值为15c4.onNext(1); // 更新C4的值为1c4.onNext(3); // 更新C4的值为3
这些动作产生了下面的输出:
C1:10C3:10C5:10C2:20C3:30C5:30C1:15C3:35C5:35C4:1C5:36C4:3C5:38
最终C5的值是38,因为C1是15,C2是20,而C4是3。
术语
由于数据的流动是从发布者(生产者)流向订阅者(消费者),因此程序员经常使用诸如向上流(upstream)和向下流(downstream)这样的术语。前面的示例代码中,向上流
onNext()方法接收的数据newValue是由notifyAllSubscribers()方法传递给向下游的onNext()方法的。
这就是“发布–订阅”的核心思想。然而,我们也省略了一些东西没有讨论,有些是非常直观的装饰性内容,有的内容(譬如背压)极其重要,下一节会专门介绍。
首先,来看看那些非常直观的东西。15.2节介绍过,使用流进行编程时,你可能更希望发送信号给对象,而不是处理一个onNext事件,因此订阅者(监听者)需要定义onError和onComplete方法。这样一来,发布者才有机会告诉订阅者发生了异常,并终止数据流的发送(譬如,温度计的例子中,温度计可能被替换了,再也无法通过onNext方法返回更多的数据)。Java 9 Flow API中的Subscriber接口提供了对onError和onComplete方法的支持。这些方法是“发布–订阅”协议比传统的观察者模式更加强大的原因之一。
两个简单却重要的概念,即压力和背压,极大地丰富了Flow接口。这些概念看起来好像无足轻重,但是它们对程序能否充分利用线程的处理能力影响很大。还是以温度计为例,假设它之前以每隔几秒钟的频率返回温度数据,之后温度计进行了升级,能以更高的频率提供数据信息,譬如每毫秒报告一次温度数据。你的程序能以足够快的速度响应这些事件么?遭遇这种情况会不会发送缓冲区溢出,甚至是程序崩溃(回想一下我们之前碰到的问题场景,线程池一下涌入大量要处理的任务,同时又有一些任务被阻塞)?类似地,假设你向一个发布者订阅了服务,该服务会为你提供SMS消息服务,将SMS消息推送到你的手机上。这项订阅刚开始可能工作得很好,因为你的新手机上只有有限的几条SMS消息,几年之后,SMS消息的数量已经累积到数以千计的规模,这时候会发生什么情况呢?这些消息可能在一秒钟内调用onNext发送完毕么?这种情况通常被称作压力。
现在,假设有一个垂直的管道,其中装着标记了消息的球。你还需要一种“背压”机制,譬如限制多少球可以被加入圆筒。背压在Java 9的Flow API中是通过request()方法实现的。request()方法定义在一个新的接口Subscription中,该方法邀请发布者按照约定的数量发送下一次的元素,而不是以无限的速率发送元素(即采用拉模式,而不是推模式)。下一节会讨论这一主题。
15.5.2 背压
我们已经学习了如何向Publisher传递一个Subscriber对象(它包含了onNext、onError和OnComplete方法),Publisher会在恰当的时候调用该对象。这个对象被用于在Publisher与Subscriber之间传递信息。通过背压(流量控制),你可以限制信息传输的速率,当然在此之前,你需要通过Subscriber向Publisher发送相关的限制信息。此外,你还需要解决一个问题,一个Publisher可能有多个Subscriber,然而你希望你设置的背压只对点对点的连接生效,不影响其他的连接。为了解决这个问题,Java 9 Flow API中的Subscriber接口提供了第4个方法:
void onSubscribe (Subscription subscription);
当第一个事件通过Publisher与Subscriber之间的管道发送时,该方法就会被调用执行。Subscription对象包含的方法可以帮助Subscriber与Publisher进行通信,代码如下所示:
interface Subscription {void cancel ();void request (long n);}
请注意,回调函数经常有的“似乎后向兼容”效果。Publisher创建了Subscription对象,并将其传递给Subscriber,后者又可以调用它的方法由Subscriber向Publisher回传信息。
15.5.3 一种简单的真实背压
为了让“发布–订阅”连接每次只处理一个事件,你需要进行下面的变更。
- 在
Subscriber中本地存储由OnSubscribe方法传递的Subscription对象,为此,你可能需要为其添加一个subscription字段。 - 让
onSubscribe、onNext和onError(有可能也需要)的最后一个动作都是使用channel.request(1)请求下一个事件(注意只请求一个事件,避免Subscriber被太多的事件淹没)。 - 修改
Publisher,让本例中的notifyAllSubscribers方法只对提交了请求的管道发送onNext或者onError事件。 - 通常,
Publisher会创建一个新的Subscription对象,并将其与Subscriber一一对应,这样才能确保多个Subscriber可以按照自己设定的背压处理数据。
虽然这一流程看起来很简单,但是实现背压时还需要额外考虑一系列的取舍。
- 你是否要以最低速度向多个
Subscriber发送事件?或者你是否要为每个Subscriber维护一个单独的未发送数据队列? - 如果这些队列增长过快,会发生什么情况?
- 如果
Subscriber还未准备好接收数据,你会丢弃事件么?
做出什么样的选择取决于传送数据的语义。从一个序列中丢失一份温度报告可能无关痛痒,但如果丢失的是你银行账户的信用卡信息就严重了。
我们经常听到“基于拉模式的反应式背压”这一概念。之所以称其为“基于拉模式的反应式”,是因为它为Subscriber提供了一种途径,借助于事件(反应式)去“拉取”(通过request方法)Publisher提供的更多信息。其结果就是背压机制。
