17.3 使用反应式库RxJava
RxJava是支持反应式编程的首批Java语言库之一。它诞生于Netflix,是对微软.Net环境中反应式扩展(reactive extension,Rx)项目的迁移。RxJava 2.0为了与前文介绍的反应式流API保持一致进行了相应的调整,现在也支持java.util.concurrent.Flow。
使用Java语言时,如果你使用了一个第三方的库,很容易就能识别,因为你需要使用import导入第三方库。举例来说,为了使用Publisher,你导入了Java的Flow接口,就需要使用下面这行声明:
import java.lang.concurrent.Flow.*;
不过如果你想要用Observable版本的Publisher,那么你还需要像下面这行代码那样,导入对应的实现类。本章后面都需要进行类似的操作。
import io.reactivex.Observable;
我们有必要特别强调一个架构问题:优秀的系统架构通常会避免把仅在某个局部使用的细节概念暴露给整个系统。因此,一种推荐的做法是只在需要Observable的额外结构时使用Observable,否则就应该继续使用它的Publisher接口。注意,使用List接口时,你毫无疑问也应该遵循这一原则。有些时候即便你知道一个方法接受一个ArrayList类型的参数,为了避免暴露太多实现的细节或者限制未来潜在的变更,你可以将该参数的类型设置为List。事实上,通过上述定义,你给代码的设计带来了更多的灵活性,未来如何你需要变更实现,将参数由ArrayList替换成LinkedList,代码则不需要做大量的变更。
本节接下来的部分会使用RxJava的反应式流实现创建一个温度–报告系统。你要做的第一个决定是到底选择哪一个类构建系统,因为RxJava提供了两个Flow.Publisher类的实现版本。
阅读RxJava文档后,你会发现其中一个类是io.reactivex.Flowable类,它提供了代码清单17-7和代码清单17-9中介绍的Java 9 Flow中基于拉模式的背压特性(通过request方式)。背压可以防止Subscriber被Publisher快速生成的大量数据压垮。另一个类是RxJava最初始的版本,即io.reactivex.Observable的Publisher,它不支持背压。这个类更容易使用,同时也更适用于用户接口事件(譬如鼠标移动)。这些事件都是不适合进行背压的流(想象一下,你怎么能让用户慢些移动鼠标,或者停止移动鼠标!)。出于上述考虑,RxJava为处理通用流事件提供了这两个版本的类实现。
RxJava建议当你的流元素不超过一千个,或者你正处理的是基于图形用户界面的事件流,譬如鼠标移动或者触摸这些无法背压或不常发生的事件时,使用非背压版本的Observable。
由于前一节介绍Flow API时已经详细分析过背压,这里就不再花费额外的笔墨讨论Flowable了。相反,我们更倾向于用一个例子介绍如何使用不带背压的Observable接口。值得一提的是,Subscriber可以通过request(Long.MAX_VALUE)调用关闭背压功能。不过我们并不推荐用户执行这一操作,除非你非常确信Subscriber总是可以及时地处理完所有接收到的事件。
17.3.1 创建和使用Observable
Observable和Flowable类都提供了非常方便的工厂方法,使用它们你可以创建多种类型的反应式流(因为Observable和Flowable都实现了Publisher接口,所以这些工厂方法能够发布反应式流)。
如果你想通过最简单的方式创建Observable,那么可以像下面这样通过创建预定数量元素的方式实现:
Observable<String> strings = Observable.just( "first", "second" );
这里的just()工厂方法3可以将一个或多个元素转换为Observable,这些Observable在适当的时候又会释放出对应的元素。Observable的Subscriber会依次接收到onNext("first")、onNext("second")以及onComplete()消息。
3采用这个约定命名有点儿略显尴尬,原因是Stream以及Optional API掀起了一股以of()命名工厂方法的风潮,Java 8以of()为它们命名了类似的工厂方法。
另一个比较常见的是Observable工厂方法,尤其是你的应用需要与用户执行实时交互的时候,它会按照固定的时间间隔发出事件:
Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);
interval工厂方法返回一个名为onePerSec的Observable,它会以你选定的一个固定时间间隔(本例的时间间隔是一秒钟),发送一个由long类型值组成的无限递增序列,这个序列由0开始计数。接着,你可以用onePerSec作为另一个Observable的基础,每隔一秒反馈一次指定城市的温度报告。
你可以打印输出为了实现最终目标所进行的这些中间步骤,即每秒返回一次的温度。为了达到这个效果,你需要向onePerSec注册,以确保每过一秒都能接收到通知,然后获取、打印你关注的城市温度。在RxJava中,Observable4扮演了Flow API中Publisher的角色,因此Observable的行为与Flow中Subscriber接口的行为也很相似。在代码清单17-2中,RxJava的Observable接口声明了Java 9 Subscriber同样的方法。唯一的不同是,它的onSubscribe方法需要一个Disposable参数,而不是一个Subscription。正如前面提到的,Observable不支持背压,因此它也没有构造Subscription的request方法。Observable接口的完整定义如下:
4注意,从Java 9开始Observer接口和Observable类都已经不推荐使用了。新代码应该使用Flow API。通过它们我们可以了解RxJava的演进过程。
public interface Observer<T> {void onSubscribe(Disposable d);void onNext(T t);void onError(Throwable t);void onComplete();}
然而,请注意一点,RxJava的API比Java 9的Flow API更灵活(提供了更多的重载变量)。譬如,订阅一个Observable对象时,你可以直接传递一个Lambda表达式给它,只提供onNext方法的签名,完全忽略其他三个方法都可以。换句话说,你可以使用一个仅用接收事件的Consumer实现onNext方法的Observer去订阅一个Observable对象,onNext方法负责处理接收到的事件,其他方法都使用默认值,即事件处理完成或者发生异常时都不做操作。凭借这个特性,你只需要编写一行代码就可以订阅Observable onePerSec,打印输出纽约每秒钟的温度情况。代码如下所示:
onePerSec.subscribe(i -> System.out.println(TempInfo.fetch( "New York" )));
这行代码中,onePerSec Observable每秒钟发出一个事件。接收到这条消息后,Subscriber就会尝试获取纽约的温度并打印输出。然而,如果把这条语句放到main方法中,并试图去执行它的话,你不会看到任何输出,因为Observable执行每秒钟发布一条事件的线程是RxJava的计算线程池中的线程,它们都是守护线程。5然而你的main程序执行完就立刻退出了,结果导致守护线程还没产生任何输出就被终止了。
5这些细节在官方文档里并没有明确提及,不过你可以在开发者社区stackoverflow.com上找到针对这种现象的解释。
你可以借助一些非官方途径,避免程序立刻退出,譬如执行完上述的那行代码后立刻把线程切换到睡眠状态。更好的解决方案是用blockingSubscribe方法调用当前线程(在这个例子中就是main函数所在的线程)的回调函数。为了更好地执行演示,使用blockingSubscribe是最合适的途径了。然而在生产环境中,通常情况下,你都是像下面这样执行subscribe方法的:
onePerSec.blockingSubscribe(i -> System.out.println(TempInfo.fetch( "New York" )));
你得到的输出可能如下所示:
New York : 87New York : 18New York : 75java.lang.RuntimeException: Error!at flow.common.TempInfo.fetch(TempInfo.java:18)at flow.Main.lambda$main$0(Main.java:12)at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:59)at io.reactivex.internal.operators.observable.ObservableInterval$IntervalObserver.run(ObservableInterval.java:74)
非常不幸,遵循设计,温度查询操作可能会随机地失败(实际是每成功读取三次之后就失败一次)。由于你的Observer实现只有正常的处理逻辑,不包含任何出错和失效的管理,譬如onError,因此一旦发生失效,这些错误就会作为未捕获的异常直接暴露给用户。
现在我们要提高难度,让这个例子更复杂一些。假设你希望不仅要有出错管理,还要统计已有的数据。你要的不再是实时打印输出温度数据,而是为用户提供一个工厂方法,每秒返回一个包含温度数据的Observable对象,该对象在完成工作退出之前最多返回五次温度数据。你可以通过名为create的工厂方法借助Lambda创建Observable对象,该方法接受另一个Observable作为参数,返回值为void,代码清单如下。
代码清单 17-12 创建一个每秒一次返回温度的
Observable对象
public static Observable<TempInfo> getTemperature(String town) {return Observable.create(emitter -> ←---- 借由一个接受Observer对象的函数创建一个新的ObservableObservable.interval(1, TimeUnit.SECONDS) ←---- 通过Observable生成一个每秒递增的无限序列.subscribe(i -> {if (!emitter.isDisposed()) { ←---- 仅在被使用的Observer对象未被回收时(譬如由于前置操作失败)执行一些动作if ( i >= 5 ) { ←---- 如果已经返回了五次温度,就终止Observer对象,关闭对应的流emitter.onComplete();} else {try {emitter.onNext(TempInfo.fetch(town)); ←---- 否则,就向Observer发送下一个温度报告} catch (Exception e) {emitter.onError(e); ←---- 一旦发生错误时,就通知Observer对象}}}}));}
这段代码中,你通过向一个函数传递ObservableEmiter,并向其发送对应的事件,创建了返回的Observable。RxJava的ObservableEmitter接口继承自RxJava的基础类Emitter。你可以把Emitter想象成不带onSubscribe方法的Observer:
public interface Emitter<T> {void onNext(T t);void onError(Throwable t);void onComplete();}
ObservableEmitter提供了更多的方法,用于替Emitter设置新的Disposable,或者检查某个序列是否已经被下游处理过了。
你可以内部订阅一个Observable,就像onePerSec那样,以每隔一秒的频率发布一个无限递增的序列。在订阅函数(当然你还需要向订阅方法传递一个参数)的内部,你首先需要借助ObservableEmitter接口提供的isDisposed方法检查之前创建的Observer是否已经被处理了(如果上一个迭代中发生了错误,就会遭遇这种情况)。如果温度已经收集了五次,这段代码就会终止Observer对象,并关闭对应的流;否则就发送请求城市最新的温度报告给Observer对象。这段代码被包含在一个try/catch语句块中。如果获取温度时发生了错误或者异常,错误就会传递给Observer对象。
现在实现一个完整的Observer就比较简单了。这个Observer接下来会订阅getTemperature方法返回的Observable,打印输出它发布的温度数据,代码清单如下。
代码清单 17-13 用于打印输出接收温度的
Observer
import io.reactivex.Observer;import io.reactivex.disposables.Disposable;public class TempObserver implements Observer<TempInfo> {@Overridepublic void onComplete() {System.out.println( "Done!" );}@Overridepublic void onError( Throwable throwable ) {System.out.println( "Got problem: " + throwable.getMessage() );}@Overridepublic void onSubscribe( Disposable disposable ) {}@Overridepublic void onNext( TempInfo tempInfo ) {System.out.println( tempInfo );}}
这个Observer与代码清单17-7中的TempSubscriber很相似(TempSubscriber实现了Java 9的Flow.Subscriber),但是这里做了进一步的简化。因为RxJava的Observable不支持背压,所以处理完发布的事件后你不需要再调用request()方法请求更多的元素了。
在接下来的这段代码清单中,我们会创建一个main程序,让Observer订阅代码清单17-2中的getTemperature方法返回的Observable。
代码清单 17-14 打印输出纽约温度的
main类
public class Main {public static void main(String[] args) {Observable<TempInfo> observable = getTemperature( "New York" ); ←---- 创建一个Observable以每秒一次的频率发布纽约的温度报告observable.blockingSubscribe( new TempObserver() ); ←---- 通过一个简单的Observer订阅Observable,打印输出温度}}
假设这一次,温度获取过程中没有发生任何的错误,main函数每隔一秒打印输出一条温度记录,五次之后Observable发出了onComplete信号。这种情况下,你看到的输出可能是下面这样的:
New York : 69New York : 26New York : 85New York : 94New York : 29Done!
是时候进一步丰富我们的RxJava例子了,尤其是看一下它是如何帮助我们操纵一个和多个反应式流的。
17.3.2 转换及整合多个Observable
与原生的Java 9 Flow API比较起来,RxJava及其他的三方反应式库主要的优势之一是它们往往提供了更加丰富的函数集,可以更灵活地对流进行整合、创建以及过滤操作。之前演示过,一个流可以作为另一个流的输入。此外,17.2.3节介绍过Java 9的Flow.Processor,它可以对流中的数据进行转换,譬如将温度由华氏温度转换为摄氏温度。你还可以过滤流中的数据,找出你关心的元素创建一个新的流,然后使用特定的映射函数对这些元素进行转换(这些都可以通过Flow.Processor实现),你甚至可以通过多种方式合并或整合两个流(这些目前还无法通过Flow.Processor实现)。
流的转换与合并函数非常复杂,截至目前,我们都是通过单纯的文字描述来介绍,这些介绍的内容对读者而言可能晦涩难懂。举个例子来说,我们看看RxJava文档对它提供的mergeDelayError函数的描述:
对向一个
Observable发送多个Observable的Observable进行扁平处理。它允许一个Observer从所有的源Observable接收多个成功发送的元素,并且该操作不会被某一个Observable发送失败所影响,同时你还可以控制这些Observable对象上并发的订阅数目。
你一定也被上面的这段函数描述搞晕了,这看起来并不是很直观。为了解决这个问题,反应式流社区决定以一种可视化的方式描述这些函数的行为。这种可视化的方式叫作弹珠图。弹珠图(譬如图17-4)通过水平线上的几何图形表示反应式流中元素的临时顺序;通过特殊符号表示错误以及事件完成的信号。图中的方框表示命名操作是如何转换那些元素或者整合多个流的。

图 17-4 弹珠图示例——文档化典型反应式库的操作
使用这种标记方式,可以很容易地对RxJava库中所有的函数进行可视化表示,如图17-5所示,它是对map(转换由Observable发布的元素)和merge(将由两个或多个Observable发布的事件整合在一起)的可视化。

图 17-5 函数map和merge对应的弹珠图
你可能会思考如何使用map和merge改进前一节中开发的RxJava示例,甚至为它增加新的特性。map能提供更加精准的控制,譬如在执行从华氏温度到摄氏温度的转换时,用map就比直接使用Flow API的Processor要灵活得多,示例代码清单如下。
代码清单 17-15 使用
map处理Observable实现从华氏温度到摄氏温度的转换
public static Observable<TempInfo> getCelsiusTemperature(String town) {return getTemperature( town ).map( temp -> new TempInfo( temp.getTown(),(temp.getTemp() - 32) * 5 / 9) );}
这个简短的方法接受代码清单17-12中getTemperature方法返回的Observable对象,返回一个新的Observable,这个Observable以每秒一个的频率,将Observable返回的温度由华氏温度转换为摄氏温度并发布出去。
为了加强你对如何处理由Observable返回的元素的理解,建议你尽量尝试使用下面测验中的新方法去操作,处理返回的元素。
测验17.2:过滤出那些值为负的温度
Observable类的filter方法接受一个Predicate做参数,返回一个新的Observable,这个新的Observable只发布符合Predicate定义要求的元素。假设你需要开发一个预警系统,在有结冰的危险时,提醒用户做好相应的预防措施。你怎样借助这个操作符创建一个Observable对象,使其仅在指定城市的温度低于零度时,才以摄氏温度的格式返回对应的温度呢(从水的冰点考虑,使用摄氏温度由零开始计算要容易得多)?答案:用代码清单17-15返回的
Observable,搭配一个接受Predicate的filter操作符就可以完美地实现这一需求。这个Predicate会找出所有温度为负值的元素。代码如下所示:
public static Observable<TempInfo> getNegativeTemperature(String town) {return getCelsiusTemperature( town ).filter( temp -> temp.getTemp() < 0 );}
现在假设要求你对上述方法进行泛化,允许用户设定城市时,既可以指定单一城市,也可以指定由多个城市组成的集合,但返回的依旧是发布温度数据的Observable对象。代码清单17-16实现了最新的需求,它为每个城市分别调用了代码清单17-15中的方法,并使用merge方法整合了这些调用所返回的Observable。
代码清单 17-16 使用
merge合并多个城市的温度
public static Observable<TempInfo> getCelsiusTemperatures(String... towns) {return Observable.merge(Arrays.stream(towns).map(TempObservable::getCelsiusTemperature).collect(toList()));}
这个方法中,接受查询城市的变量是一个变长参数,你可以指定一个城市的集合。这个变长参数会被转换为一个字符串流,接着每个字符串会被传递给代码清单17-11中的getCelsiusTemperature方法(它在代码清单17-15中进行过改良)。通过这种方式,每个城市都被转换成了以每秒一次频率发布温度数据的Observable对象。最终,这个Observable流被收集到了一个列表中,列表被传递给了Observable类自身的静态工厂方法merge。该方法迭代遍历访问每一个Observable元素,并整合其输出,让它们的行为表现得就像一个单一的Observable对象一样。换句话说,最终的这个Observable会发布由Iterable传递的所有Observable对象发布的事件,并保持其原有的顺序。
为了测试这个方法,我们将在一个main类中调用它,代码清单如下。
代码清单 17-17 打印输出三个城市温度的
main类
public class Main {public static void main(String[] args) {Observable<TempInfo> observable = getCelsiusTemperatures("New York", "Chicago", "San Francisco" );observable.blockingSubscribe( new TempObserver() );}}
这个main类与代码清单17-14几乎是一样的,只不过你现在订阅的是由代码清单17-16的getCelsiusTemperatures方法返回的Observable,从而打印输出了三个城市的温度数据。执行这个main类会产生下面这样的输出:
New York : 21Chicago : 6San Francisco : -15New York : -3Chicago : 12San Francisco : 5Got problem: Error!
main类每秒打印输出请求城市的温度数据,直到某次温度查询操作失败,抛出一个异常。该异常会传递给Observable中断流数据的处理。
本章的目标并不是全面完整地介绍RxJava(或者其他的反应式库),要达到这样的效果可能需要一整本书的内容。我们只希望通过这些介绍能让你对这种工具集有一些感性的认识,包括它们是如何工作的,以及反应式编程的基本原则是什么。本章只涉及了这种新型编程方式的皮毛,不过希望这种编程模式的优点能燃起你对它的兴趣。
