10.3 第二个例子:新闻系统
前面的例子使用了SubmissionPublisher类,因此没有实现Flow.Publisher接口和Flow.Subscription接口。如果SubmissionPublisher提供的功能不符合需求,那么必须实现自己的发布者和订阅关系。
本节,你将学习如何实现这两个接口,进而理解反应流的规范。本节将实现一个新闻系统,其中每则新闻将与一个类别相关联。订阅者将订阅一个或多个类别,而发布者只会向每个订阅相应类别的订阅者发送新闻。
10.3.1 News类
要实现的第一个类是News类。该类描述了要从发布者发送给消费者的每则新闻。我们将存储三个属性。
category属性:一个存储新闻类别的int值。它可以采用数值0、1、2和3分别表示体育、世界、经济和科学类别的新闻。txt属性:存储新闻文本的String值。date属性:存储新闻日期的Date值。
和往常一样,仍然要将这些属性声明为private,并且实现相应的get()方法和set()方法获取和设置这些属性值。
10.3.2 发布者相关的类
我们需要四个类来实现Flow.Publisher接口和Flow.Subscription接口。第一个是实现了Flow.Subscription接口的MySubscription类。我们将在该类中保存三个属性。
canceled属性:用于指示订阅是否被取消的布尔值。requested属性:用于存储消费者所请求的新闻条数的AtomicLong值。categories属性:用于存储与当前订阅相关联的新闻类别的一组整型值。
下面的代码展示了对上述属性的声明。
public class MySubscription implements Subscription {private boolean cancelled = false;private AtomicLong requested = new AtomicLong(0);private Set<Integer> categories;
然后,还要实现Flow.Subscription接口所提供的两个方法:cancel()方法和request()方法。
@Overridepublic void cancel() {cancelled=true;}@Overridepublic void request(long value) {requested.addAndGet(value);}
cancel()方法只是将cancelled属性设置为true,而request()方法则会增加requested属性的值。在实际例子中,可能还要对那些作为参数传递给这些方法的值进行验证。
然后,我们还实现了其他方法来获取和设置该类的各属性值。
isCancelled():该方法返回cancelled属性的值。getRequested():该方法使用get()方法返回requested属性的值。decreaseRequested():该方法使用decrementAndGet()方法减少requested属性的值。setCategories():该方法设定categories属性的值。hasCategory():该方法返回布尔值,指明参数中的类别(一个int值)是否与当前订阅相关联。
然后实现ConsumerData类。我们将使用该类存储订阅者的信息,以及发布者和订阅者之间的订阅关系。因此,该类有如下两个属性。
consumer属性:使用News类参数化的Subscriber值。它将存储新闻消费者的关联关系。subscription属性:与发布者和订阅者之间的订阅关系相关的MySubscription值。
我们还给出了获取和设置这两个属性值的get()方法和set()方法。
然后,还要实现PublisherTask类,该类实现了Runnable接口。我们将使用这样的任务向消费者发送条目。我们声明了两个属性来存储与消费者相关的数据、消费者和发布者之间的订阅关系,以及想要发送的条目(在我们的例子中是一则新闻)。
consumerData属性:如前所述,consumerData对象分别存储了Subscriber对象和MySubscription对象。前者含有各条目的消费者,后者包含发布者与发布者之间的订阅关系。news属性:含有想要发送给订阅者的新闻的News对象。
使用该类的构造函数初始化这两个属性。
public class PublisherTask implements Runnable {private ConsumerDataconsumerData;private News news;public PublisherTask(ConsumerDataconsumerData, News news) {this.consumerData = consumerData;this.news = news;}
然后,实现run()方法。该方法将检查是否必须将News对象发送给订阅者。它将检查以下三个条件。
- 订阅没有取消:使用
subscription对象的isCancelled()方法。 - 订阅者请求了更多的条目:使用
subscription对象的getRequested()方法。 News对象的类别存在于与该订阅者关联的类别集中:使用subscription对象的hasCategory()方法。
如果该news对象通过了这三个条件,那么使用onNext()方法将其发送给订阅者。我们还使用了subscription对象的decreaseRequested()方法来减少该订阅者请求的条目数。该方法的源代码如下:
@Overridepublic void run() {MySubscription subscription = consumerData.getSubscription();if (!(subscription.isCanceled()) && (subscription.getRequested() > 0)&& (subscription.hasCategory(news.getCategory()))) {consumerData.getConsumer().onNext(news);subscription.decreaseRequested();}}
最后实现MyPublisher类。该类实现了采用News类参数化的Flow.Publisher接口。我们将使用两个属性来实现该类的行为。
consumers属性:一个使用ConsumerData类参数化的ConcurrentLinkedDeque对象,用于存储该发布者的所有订阅者的信息。executor属性:一个用于执行PublisherTask对象的ThreadPoolExecutor对象。
使用该类的构造函数初始化这两个属性。
public class MyPublisher implements Publisher<News> {private ConcurrentLinkedDeque<ConsumerData> consumers;private ThreadPoolExecutor executor;public MyPublisher() {consumers=new ConcurrentLinkedDeque<>();executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());}
然后,实现Flow.Publisher接口提供的subscribe()方法。该方法接收想要订阅该发布者的Subscriber对象作为参数。创建一个新的MySubscription对象、一个新的ConsumerData对象(添加到消费者的数据结构),并且调用Subscriber对象的onSubscribe()方法(其参数为MySubscription对象)。
@Overridepublic void subscribe(Subscriber<? super News> subscriber) {ConsumerDataconsumerData=new ConsumerData();consumerData.setConsumer((Subscriber<News>)subscriber);MySubscription subscription=new MySubscription();consumerData.setSubscription(subscription);subscriber.onSubscribe(subscription);consumers.add(consumerData);}
然后,实现publish()方法。该方法接收一个News对象作为参数,并尝试将其发送给该发布者的所有订阅者。处理存储在Consumers数据结构中的所有元素,创建一个新的PublisherTask对象,并使用execute()方法在执行器中执行它们。
如果发生错误,将对subscriber对象使用onError()方法,以便将错误通知给订阅者 。
public void publish(News news) {consumers.forEach( consumerData -> {try {executor.execute(new PublisherTask(consumerData, news));} catch (Exception e) {consumerData.getConsumer().onError(e);}});}
最后,实现shutdown()方法。该方法将通知所有订阅者通信结束,并且完成内部ThreadPoolExecutor的执行。
public void shutdown() {consumers.forEach( consumerData -> {consumerData.getConsumer().onComplete();});executor.shutdown();}}
在这四个类中,我们实现了该示例的发布者部分。接下来介绍消费者部分的实现。
10.3.3 Consumer类
该类实现了Flow.Subscriber接口,并且实现了新闻的消费者。在内部,它使用了三个属性。
subscription属性:一个MySubscription对象,它存储了订阅者和发布者之间的订阅关系。name属性:一个存储订阅者名称的String属性。categories属性:一个整型数值集合,存储了该订阅者想要接收的消息的类别。
和此前一样,使用该类的构造函数初始化这些属性。
public class Consumer implements Subscriber<News> {private MySubscription subscription;private String name;private Set<Integer> categories;public Consumer(String name, Set<Integer> categories) {this.name=name;this.categories = categories;}
现在,要实现Flow.Subscriber接口提供的方法了。onComplete()方法和onError()方法仅在控制台输出信息。
@Overridepublic void onComplete() {System.out.printf("%s - %s: Consumer - Completed\n", name,Thread.currentThread().getName());}@Overridepublic void onError(Throwable exception) {System.out.printf("%s - %s: Consumer - Error: %s\n", name,Thread.currentThread().getName(),exception.getMessage());}
onSubscribe()方法接收Subscription对象作为参数,在subscription属性中存储该对象,并使用与此订阅者相关联的类别更新该属性。最后,使用request()方法请求第一个News对象。
@Overridepublic void onSubscribe(Subscription subscription) {this.subscription = (MySubscription)subscription;this.subscription.setCategories(this.categories);this.subscription.request(1);System.out.printf("%s: Consumer - Subscription\n",Thread.currentThread().getName());}
最后实现onNext()方法,该方法接收一个News对象作为参数,在控制台输出该对象的信息,并且使用request()方法请求下一个对象。
@Overridepublic void onNext(News item) {System.out.printf("%s - %s: Consumer - News\n", name,Thread.currentThread().getName());System.out.printf("%s - %s: Text: %s\n", name,Thread.currentThread().getName(),item.getTxt());System.out.printf("%s - %s: Category: %s\n", name,Thread.currentThread().getName(),item.getCategory());System.out.printf("%s - %s: Date: %s\n", name,Thread.currentThread().getName(),item.getDate());subscription.request(1);}
10.3.4 Main类
最后,使用main()方法实现Main类,测试在该示例中实现的所有类。
创建一个MyPublisher对象和三个Consumer对象,如下所示。
consumer1对象只接收运动方面的新闻。consumer2对象只接收关于科学的新闻。consumer3对象只接收四种类别的新闻。
创建这些对象并且将它们订阅到发布者。
public class Main {public static void main(String[] args) {MyPublisher publisher=new MyPublisher();Subscriber<News>consumer1, consumer2, consumer3;Set<Integer> sports = new HashSet();sports.add(News.SPORTS);consumer1=new Consumer("Sport Consumer",sports);Set<Integer> science = new HashSet();science.add(News.SCIENCE);consumer2=new Consumer("Science Consumer", science);Set<Integer> all = new HashSet();all.add(News.ECONOMIC);all.add(News.SCIENCE);all.add(News.SPORTS);all.add(News.WORLD);consumer3=new Consumer("All Consumer", all);publisher.subscribe(consumer1);publisher.subscribe(consumer2);publisher.subscribe(consumer3);System.out.printf("Main: Start\n");
然后,使用publisher对象将四则新闻(每个类别各一条)发送给消费者。每则新闻之间间隔1秒钟。
News news=new News();news.setTxt("Basketball news");news.setCategory(News.SPORTS);news.setDate(new Date());publisher.publish(news);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}news=new News();news.setTxt("Money news");news.setCategory(News.ECONOMIC);news.setDate(new Date());publisher.publish(news);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}news=new News();news.setTxt("Europe news");news.setCategory(News.WORLD);news.setDate(new Date());publisher.publish(news);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}news=new News();news.setTxt("Space news");news.setCategory(News.SCIENCE);news.setDate(new Date());publisher.publish(news);
最后,使用publisher对象的shutdown()方法完成系统所有要素的执行。
publisher.shutdown();System.out.printf("Main: End\n");}}
下面的屏幕截图显示了本例执行时的一部分输出结果。可以看到consumer3对象接收了所有新闻,但是consumer1和consumer2对象只接收相关类别的新闻。

