10.3 第二个例子:新闻系统

前面的例子使用了SubmissionPublisher类,因此没有实现Flow.Publisher接口和Flow.Subscription接口。如果SubmissionPublisher提供的功能不符合需求,那么必须实现自己的发布者和订阅关系。

本节,你将学习如何实现这两个接口,进而理解反应流的规范。本节将实现一个新闻系统,其中每则新闻将与一个类别相关联。订阅者将订阅一个或多个类别,而发布者只会向每个订阅相应类别的订阅者发送新闻。

10.3.1 News

要实现的第一个类是News类。该类描述了要从发布者发送给消费者的每则新闻。我们将存储三个属性。

  • category属性:一个存储新闻类别的int值。它可以采用数值0、1、2和3分别表示体育、世界、经济和科学类别的新闻。
  • txt属性:存储新闻文本的String值。
  • date属性:存储新闻日期的Date值。

和往常一样,仍然要将这些属性声明为private,并且实现相应的get()方法和set()方法获取和设置这些属性值。

10.3.2 发布者相关的类

我们需要四个类来实现Flow.Publisher接口和Flow.Subscription接口。第一个是实现了Flow.Subscription接口的MySubscription类。我们将在该类中保存三个属性。

  • canceled属性:用于指示订阅是否被取消的布尔值。
  • requested属性:用于存储消费者所请求的新闻条数的AtomicLong值。
  • categories属性:用于存储与当前订阅相关联的新闻类别的一组整型值。

下面的代码展示了对上述属性的声明。

  1. public class MySubscription implements Subscription {
  2. private boolean cancelled = false;
  3. private AtomicLong requested = new AtomicLong(0);
  4. private Set<Integer> categories;

然后,还要实现Flow.Subscription接口所提供的两个方法:cancel()方法和request()方法。

  1. @Override
  2. public void cancel() {
  3. cancelled=true;
  4. }
  5. @Override
  6. public void request(long value) {
  7. requested.addAndGet(value);
  8. }

cancel()方法只是将cancelled属性设置为true,而request()方法则会增加requested属性的值。在实际例子中,可能还要对那些作为参数传递给这些方法的值进行验证。

然后,我们还实现了其他方法来获取和设置该类的各属性值。

  • isCancelled():该方法返回cancelled属性的值。
  • getRequested():该方法使用get()方法返回requested属性的值。
  • decreaseRequested():该方法使用decrementAndGet()方法减少requested属性的值。
  • setCategories():该方法设定categories属性的值。
  • hasCategory():该方法返回布尔值,指明参数中的类别(一个int值)是否与当前订阅相关联。

然后实现ConsumerData类。我们将使用该类存储订阅者的信息,以及发布者和订阅者之间的订阅关系。因此,该类有如下两个属性。

  • consumer属性:使用News类参数化的Subscriber值。它将存储新闻消费者的关联关系。
  • subscription属性:与发布者和订阅者之间的订阅关系相关的MySubscription值。

我们还给出了获取和设置这两个属性值的get()方法和set()方法。

然后,还要实现PublisherTask类,该类实现了Runnable接口。我们将使用这样的任务向消费者发送条目。我们声明了两个属性来存储与消费者相关的数据、消费者和发布者之间的订阅关系,以及想要发送的条目(在我们的例子中是一则新闻)。

  • consumerData属性:如前所述,consumerData对象分别存储了Subscriber对象和MySubscription对象。前者含有各条目的消费者,后者包含发布者与发布者之间的订阅关系。
  • news属性:含有想要发送给订阅者的新闻的News对象。

使用该类的构造函数初始化这两个属性。

  1. public class PublisherTask implements Runnable {
  2. private ConsumerDataconsumerData;
  3. private News news;
  4. public PublisherTask(ConsumerDataconsumerData, News news) {
  5. this.consumerData = consumerData;
  6. this.news = news;
  7. }

然后,实现run()方法。该方法将检查是否必须将News对象发送给订阅者。它将检查以下三个条件。

  • 订阅没有取消:使用subscription对象的isCancelled()方法。
  • 订阅者请求了更多的条目:使用subscription对象的getRequested()方法。
  • News对象的类别存在于与该订阅者关联的类别集中:使用subscription对象的hasCategory()方法。

如果该news对象通过了这三个条件,那么使用onNext()方法将其发送给订阅者。我们还使用了subscription对象的decreaseRequested()方法来减少该订阅者请求的条目数。该方法的源代码如下:

  1. @Override
  2. public void run() {
  3. MySubscription subscription = consumerData.getSubscription();
  4. if (!(subscription.isCanceled()) && (subscription.getRequested() > 0)
  5. && (subscription.hasCategory(news.getCategory()))) {
  6. consumerData.getConsumer().onNext(news);
  7. subscription.decreaseRequested();
  8. }
  9. }

最后实现MyPublisher类。该类实现了采用News类参数化的Flow.Publisher接口。我们将使用两个属性来实现该类的行为。

  • consumers属性:一个使用ConsumerData类参数化的ConcurrentLinkedDeque对象,用于存储该发布者的所有订阅者的信息。
  • executor属性:一个用于执行PublisherTask对象的ThreadPoolExecutor对象。

使用该类的构造函数初始化这两个属性。

  1. public class MyPublisher implements Publisher<News> {
  2. private ConcurrentLinkedDeque<ConsumerData> consumers;
  3. private ThreadPoolExecutor executor;
  4. public MyPublisher() {
  5. consumers=new ConcurrentLinkedDeque<>();
  6. executor = (ThreadPoolExecutor)Executors.newFixedThreadPool
  7. (Runtime.getRuntime().availableProcessors());
  8. }

然后,实现Flow.Publisher接口提供的subscribe()方法。该方法接收想要订阅该发布者的Subscriber对象作为参数。创建一个新的MySubscription对象、一个新的ConsumerData对象(添加到消费者的数据结构),并且调用Subscriber对象的onSubscribe()方法(其参数为MySubscription对象)。

  1. @Override
  2. public void subscribe(Subscriber<? super News> subscriber) {
  3. ConsumerDataconsumerData=new ConsumerData();
  4. consumerData.setConsumer((Subscriber<News>)subscriber);
  5. MySubscription subscription=new MySubscription();
  6. consumerData.setSubscription(subscription);
  7. subscriber.onSubscribe(subscription);
  8. consumers.add(consumerData);
  9. }

然后,实现publish()方法。该方法接收一个News对象作为参数,并尝试将其发送给该发布者的所有订阅者。处理存储在Consumers数据结构中的所有元素,创建一个新的PublisherTask对象,并使用execute()方法在执行器中执行它们。

如果发生错误,将对subscriber对象使用onError()方法,以便将错误通知给订阅者 。

  1. public void publish(News news) {
  2. consumers.forEach( consumerData -> {
  3. try {
  4. executor.execute(new PublisherTask(consumerData, news));
  5. } catch (Exception e) {
  6. consumerData.getConsumer().onError(e);
  7. }
  8. });
  9. }

最后,实现shutdown()方法。该方法将通知所有订阅者通信结束,并且完成内部ThreadPoolExecutor的执行。

  1. public void shutdown() {
  2. consumers.forEach( consumerData -> {
  3. consumerData.getConsumer().onComplete();
  4. });
  5. executor.shutdown();
  6. }
  7. }

在这四个类中,我们实现了该示例的发布者部分。接下来介绍消费者部分的实现。

10.3.3 Consumer

该类实现了Flow.Subscriber接口,并且实现了新闻的消费者。在内部,它使用了三个属性。

  • subscription属性:一个MySubscription对象,它存储了订阅者和发布者之间的订阅关系。
  • name属性:一个存储订阅者名称的String属性。
  • categories属性:一个整型数值集合,存储了该订阅者想要接收的消息的类别。

和此前一样,使用该类的构造函数初始化这些属性。

  1. public class Consumer implements Subscriber<News> {
  2. private MySubscription subscription;
  3. private String name;
  4. private Set<Integer> categories;
  5. public Consumer(String name, Set<Integer> categories) {
  6. this.name=name;
  7. this.categories = categories;
  8. }

现在,要实现Flow.Subscriber接口提供的方法了。onComplete()方法和onError()方法仅在控制台输出信息。

  1. @Override
  2. public void onComplete() {
  3. System.out.printf("%s - %s: Consumer - Completed\n", name,
  4. Thread.currentThread().getName());
  5. }
  6. @Override
  7. public void onError(Throwable exception) {
  8. System.out.printf("%s - %s: Consumer - Error: %s\n", name,
  9. Thread.currentThread().getName(),
  10. exception.getMessage());
  11. }

onSubscribe()方法接收Subscription对象作为参数,在subscription属性中存储该对象,并使用与此订阅者相关联的类别更新该属性。最后,使用request()方法请求第一个News对象。

  1. @Override
  2. public void onSubscribe(Subscription subscription) {
  3. this.subscription = (MySubscription)subscription;
  4. this.subscription.setCategories(this.categories);
  5. this.subscription.request(1);
  6. System.out.printf("%s: Consumer - Subscription\n",
  7. Thread.currentThread().getName());
  8. }

最后实现onNext()方法,该方法接收一个News对象作为参数,在控制台输出该对象的信息,并且使用request()方法请求下一个对象。

  1. @Override
  2. public void onNext(News item) {
  3. System.out.printf("%s - %s: Consumer - News\n", name,
  4. Thread.currentThread().getName());
  5. System.out.printf("%s - %s: Text: %s\n", name,
  6. Thread.currentThread().getName(),item.getTxt());
  7. System.out.printf("%s - %s: Category: %s\n", name,
  8. Thread.currentThread().getName(),
  9. item.getCategory());
  10. System.out.printf("%s - %s: Date: %s\n", name,
  11. Thread.currentThread().getName(),item.getDate());
  12. subscription.request(1);
  13. }

10.3.4 Main

最后,使用main()方法实现Main类,测试在该示例中实现的所有类。

创建一个MyPublisher对象和三个Consumer对象,如下所示。

  • consumer1对象只接收运动方面的新闻。
  • consumer2对象只接收关于科学的新闻。
  • consumer3对象只接收四种类别的新闻。

创建这些对象并且将它们订阅到发布者。

  1. public class Main {
  2. public static void main(String[] args) {
  3. MyPublisher publisher=new MyPublisher();
  4. Subscriber<News>consumer1, consumer2, consumer3;
  5. Set<Integer> sports = new HashSet();
  6. sports.add(News.SPORTS);
  7. consumer1=new Consumer("Sport Consumer",sports);
  8. Set<Integer> science = new HashSet();
  9. science.add(News.SCIENCE);
  10. consumer2=new Consumer("Science Consumer", science);
  11. Set<Integer> all = new HashSet();
  12. all.add(News.ECONOMIC);
  13. all.add(News.SCIENCE);
  14. all.add(News.SPORTS);
  15. all.add(News.WORLD);
  16. consumer3=new Consumer("All Consumer", all);
  17. publisher.subscribe(consumer1);
  18. publisher.subscribe(consumer2);
  19. publisher.subscribe(consumer3);
  20. System.out.printf("Main: Start\n");

然后,使用publisher对象将四则新闻(每个类别各一条)发送给消费者。每则新闻之间间隔1秒钟。

  1. News news=new News();
  2. news.setTxt("Basketball news");
  3. news.setCategory(News.SPORTS);
  4. news.setDate(new Date());
  5. publisher.publish(news);
  6. try {
  7. TimeUnit.SECONDS.sleep(1);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. news=new News();
  12. news.setTxt("Money news");
  13. news.setCategory(News.ECONOMIC);
  14. news.setDate(new Date());
  15. publisher.publish(news);
  16. try {
  17. TimeUnit.SECONDS.sleep(1);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. news=new News();
  22. news.setTxt("Europe news");
  23. news.setCategory(News.WORLD);
  24. news.setDate(new Date());
  25. publisher.publish(news);
  26. try {
  27. TimeUnit.SECONDS.sleep(1);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. news=new News();
  32. news.setTxt("Space news");
  33. news.setCategory(News.SCIENCE);
  34. news.setDate(new Date());
  35. publisher.publish(news);

最后,使用publisher对象的shutdown()方法完成系统所有要素的执行。

  1. publisher.shutdown();
  2. System.out.printf("Main: End\n");
  3. }
  4. }

下面的屏幕截图显示了本例执行时的一部分输出结果。可以看到consumer3对象接收了所有新闻,但是consumer1consumer2对象只接收相关类别的新闻。

10.3 第二个例子:新闻系统 - 图1