9.7 响应式编程

CompletableFuture背后的概念可以从单一的返回值推广到数据流,这就是响应式编程。响应式编程其实是一种声明式编程方法,它让程序员以自动流动的变化和数据流来编程。

你可以将电子表格想象成一个使用响应式编程的例子。如果在单元格C1中键入=B1+5,其实是在告诉电子表格将B1中的值加5,然后将结果存入C1。而且,将来B1中的值变化后,电子表格会自动刷新C1中的值。

RxJava类库将这种响应式的理念移植到了JVM。我们这里不会深入类库,只描述其中的一些关键概念。

RxJava类库引入了一个叫作Observable的类,该类代表了一组待响应的事件,可以理解为一沓欠条。在Observable对象和第3章讲述的Stream接口之间有很强的关联。

两种情况下,都需要使用Lambda表达式将行为和一般的操作关联、都需要将高阶函数链接起来定义完成任务的规则。实际上,Observable定义的很多操作都和Stream的相同:mapfilterreduce

最大的不同在于用例。Stream是为构建内存中集合的计算流程而设计的,而RxJava则是为了组合异步和基于事件的系统流程而设计的。它没有取数据,而是把数据放进去。换个角度理解RxJava,它是处理一组值,而CompletableFuture用来处理一个值。

这次的例子是查找艺术家,如例9-5所示。search方法根据名字和国籍过滤结果,它在本地缓存了一份艺术家名单,但必须从外部服务上查询艺术家信息,比如国籍。

例9-15 通过名字和国籍查找艺术家

  1. public Observable<Artist> search(String searchedName,
  2. String searchedNationality,
  3. int maxResults) {
  4. return getSavedArtists()
  5. .filter(name -> name.contains(searchedName))
  6. .flatMap(this::lookupArtist)
  7. .filter(artist -> artist.getNationality()
  8. .contains(searchedNationality))
  9. .take(maxResults);
  10. }

在➊处取得一个包含艺术家姓名的Observable对象,该对象的高阶函数和Stream类似,在➋和➍处使用姓名和国籍做过滤,和使用Stream是一样的。

在➌处将姓名替换为一个Artist对象,如果这只是调用构造函数这么简单,我们显然会使用map操作。但这里我们需要组合调用一系列外部服务,每种服务都可能在它自己的线程或线程池里执行。因此,我们将名字替换为Observable对象,来表示一个或多个艺术家,因此使用了flatMap操作。

我们还需要在查找时限定返回结果的最大值:maxResults,在➎处,我们通过调用Observable对象的take方法来实现该功能。

读者会发现,这个API很像使用Stream。它和Stream的最大区别是:Stream是为了计算最终结果,而RxJava在线程模型上则像CompletableFuture

使用CompletableFuture时,我们通过给complete方法一个值来偿还欠条。而Observable代表了一个事件流,我们需要有能力传入多个值,例9-16展示了该怎么做。

例9-16 给Observable对象传值,并且完成它

  1. observer.onNext("a");
  2. observer.onNext("b");
  3. observer.onNext("c");
  4. observer.onCompleted();

我们不停地调用onNext方法,Observable对象中的每个值都调用一次。这可以在一个循环里做,也可以在任何我们想要生成值的线程里做。一旦完成了产生事件的工作,就调用onCompleted方法表示任务完成。和使用Stream一样,也有一些静态工厂方法用来从Future、迭代器和数组中创建Observable对象。

CompletableFuture类似,Observable也能处理异常。如果出现错误,调用onError方法,如例9-17所示。这里的功能和CompletableFuture略有不同——你能得到异常发生之前所有的事件,但两种情况下,只能正常或异常地终结程序,两者只能选其一。

例9-17 通知Observable对象有错误发生

  1. observer.onError(new Exception());

和介绍CompletableFuture时一样,这里只给出了如何使用和在什么地方使用Observable的一点建议。读者如果想了解跟多细节,请阅读项目文档(https://github.com/ReactiveX/RxJava/wiki/Getting-Started)。RxJava已经开始集成进Java类库的生态系统,比如企业级的集成框架Apache Camel已经加入了一个叫作Camel RX(http://camel.apache.org/rx.html)的模块,该模块使得可以在该框架中使用RxJava。Vert.x项目也启动了一个Rx-ify(https://github.com/vert-x/mod-rxvertx)它的API项目。