10.2 第一个例子:面向事件通知的集中式系统

该示例将实现一个系统,把来自事件生成器的条目发送给事件的消费者。我们将使用SubmissionPublisher类实现事件的生产者和消费者之间的通信。

10.2.1 Event

该类存储了每个条目的信息。每个条目包含了三个属性。

  • msg属性,用于在Event对象中存储消息。
  • source属性,用于存储生成Event对象的类的名称。
  • date属性,用于存储Event生成的日期。

必须将这三个属性声明为private,并且在该类中包含相应的get()方法和set()方法。

10.2.2 Producer

我们将使用该类实现生成事件的任务,这些任务将通过SubmissionPublisher对象发送给消费者。该类实现了Runnable接口,并且存储了两个属性。

  • publisher属性:该属性存储SubmissionPublisher对象,将事件发送给消费者。
  • name属性:该属性存储了生产者的名称。

使用该类的构造函数初始化这两个属性。

  1. public class Producer implements Runnable {
  2. private SubmissionPublisher<Event> publisher;
  3. private String name;
  4. public Producer(SubmissionPublisher<Event> publisher, String name) {
  5. this.publisher = publisher;
  6. this.name = name;
  7. }

然后,实现run()方法。在该方法中,生成10个事件。在一个事件和下一事件之间,随机等待一个随机秒数(0到10之间)。该方法的源代码如下:

  1. @Override
  2. public void run() {
  3. Random random = new Random();
  4. for (int i=0 ; i < 10; i++) {
  5. Event event = new Event();
  6. event.setMsg("Event number "+i);
  7. event.setSource(this.name);
  8. event.setDate(new Date());
  9. publisher.submit(event);
  10. int number = random.nextInt(10);
  11. try {
  12. TimeUnit.SECONDS.sleep(number);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }

10.2.3 Consumer

现在,在Consumer类中实现事件的消费者。这个类实现了采用Event类参数化的Flow.Subscriber接口,因此必须实现该接口提供的四种方法。

首先,声明两个属性。

  • name属性,用于存储消费者的名称。
  • subscription属性,用于存储Flow.Subscription实例,该实例负责管理消费者与生产者之间的通信。

使用该类的构造函数初始化name属性,如以下代码片段所示:

  1. public class Consumer implements Subscriber<Event> {
  2. private String name;
  3. private Subscription subscription;
  4. public Consumer (String name) {
  5. this.name = name;
  6. }

现在,实现Flow.Subscriber接口的四种方法。onComplete()方法和onError()方法只将信息显示到控制台。

  1. @Override
  2. public void onComplete() {
  3. this.showMessage("No more events");
  4. }
  5. @Override
  6. public void onError(Throwable error) {
  7. this.showMessage("An error has ocurred");
  8. error.printStackTrace();
  9. }

当消费者希望订阅其通知时,SubmissionPublisher类将调用onSubscribe()方法,作为参数传递的Subscription对象将存放在subscription属性中,然后我们使用request()方法向发布者请求第一条消息。最后,在控制台输出消息。

  1. @Override
  2. public void onSubscribe(Subscription subscription) {
  3. this.subscription=subscription;
  4. this.subscription.request(1);
  5. this.showMessage("Subscription OK");
  6. }

最后,对于每个事件,SubmissionPublisher类都将调用onNext()方法。我们在控制台中显示该事件的信息,使用request()方法请求下一个事件,并且调用辅助方法proccesEvent()

  1. @Override
  2. public void onNext(Event event) {
  3. this.showMessage("An event has arrived: "+event.getSource()+":
  4. "+event.getDate()+": "+event.getMsg());
  5. this.subscription.request(1);
  6. processEvent(event);
  7. }

使用processEvent()方法模拟消费者处理事件的时间。随机等待0到3秒以实现这一行为。

  1. private void processEvent(Event event) {
  2. Random random = new Random();
  3. int number = random.nextInt(3);
  4. try {
  5. TimeUnit.SECONDS.sleep(number);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. }

最后,必须实现上一个方法中使用的辅助方法showMessage()。它显示了参数中字符串的内容,其中含有执行消费者的线程的名称,以及消费者的名称。

  1. private void showMessage (String txt) {
  2. System.out.println(Thread.currentThread().getName()+":"+this
  3. .name+":"+txt);
  4. }
  5. }

10.2.4 Main

最后,实现Main类,其中含有创建并运行该示例所有组件的main()方法。

创建以下元素。

  • 一个名为publisherSubmissionPublisher对象。我们将使用该对象将事件发送给消费者。
  • 五个Consumer对象,它们将接收发布者创建的所有事件。我们使用subscribe()方法向发布者订阅消费者。
  • 两个Producer对象,它们将生成事件,并使用publisher对象将事件发送给消费者。我们使用JVM提供的默认ForkJoinPool对象执行生产者对象,并使用commonPool()方法获取ForkJoinPool对象,并且使用submit()方法执行它们。
  1. public class Main {
  2. public static void main(String[] args) {
  3. SubmissionPublisher<Event> publisher = new SubmissionPublisher();
  4. for (int i = 0; i < 5; i++) {
  5. Consumer consumer = new Consumer("Consumer "+i);
  6. publisher.subscribe(consumer);
  7. }
  8. Producer system1 = new Producer(publisher, "System 1");
  9. Producer system2 = new Producer(publisher, "System 2");
  10. ForkJoinTask<?>task1 = ForkJoinPool.commonPool().submit(system1);
  11. ForkJoinTask<?>task2 = ForkJoinPool.commonPool().submit(system2);

然后,给出一个while循环,该循环每10秒输出有关任务和发布者对象的信息,代码块如下:

  1. do {
  2. System.out.println("Main: Task 1: "+task1.isDone());
  3. System.out.println("Main: Task 2: "+task2.isDone());
  4. System.out.println("Publisher: MaximunLag:"+
  5. publisher.estimateMaximumLag());
  6. System.out.println("Publisher: Max Buffer Capacity: "+
  7. publisher.getMaxBufferCapacity());
  8. try {
  9. TimeUnit.SECONDS.sleep(10);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. } while ((!task1.isDone()) || (!task2.isDone()) ||
  14. (publisher.estimateMaximumLag() > 0));

为了完成循环的执行,要等待三个条件。

  • 执行第一个生产者对象的任务完成执行。
  • 执行第二个生产者对象的任务完成执行。
  • SubmissionPublisher对象中再没有未处理事件。使用estimateMaximumLag()方法获取该数值。

最后,使用SubmissionPublisher对象的close()方法通知订阅者执行结束。

在本例的执行过程中,生产者使用submit()方法将事件发送给SubmissionPublisher,而SubmissionPublisher又将事件发送给不同的消费者。每个消费者都使用request()方法逐个请求事件。

下面的屏幕截图显示了该程序执行一次得到的部分输出。

10.2 第一个例子:面向事件通知的集中式系统 - 图1

可以看到main()方法如何输出有关任务和publisher对象的信息,用户如何接收不同的事件,以及最后main()方法调用SubmissionPublisher对象的close()方法时,如何输出由其调用的onComplete()方法所输出的消息。