10.2 第一个例子:面向事件通知的集中式系统
该示例将实现一个系统,把来自事件生成器的条目发送给事件的消费者。我们将使用SubmissionPublisher类实现事件的生产者和消费者之间的通信。
10.2.1 Event类
该类存储了每个条目的信息。每个条目包含了三个属性。
msg属性,用于在Event对象中存储消息。source属性,用于存储生成Event对象的类的名称。date属性,用于存储Event生成的日期。
必须将这三个属性声明为private,并且在该类中包含相应的get()方法和set()方法。
10.2.2 Producer类
我们将使用该类实现生成事件的任务,这些任务将通过SubmissionPublisher对象发送给消费者。该类实现了Runnable接口,并且存储了两个属性。
publisher属性:该属性存储SubmissionPublisher对象,将事件发送给消费者。name属性:该属性存储了生产者的名称。
使用该类的构造函数初始化这两个属性。
public class Producer implements Runnable {private SubmissionPublisher<Event> publisher;private String name;public Producer(SubmissionPublisher<Event> publisher, String name) {this.publisher = publisher;this.name = name;}
然后,实现run()方法。在该方法中,生成10个事件。在一个事件和下一事件之间,随机等待一个随机秒数(0到10之间)。该方法的源代码如下:
@Overridepublic void run() {Random random = new Random();for (int i=0 ; i < 10; i++) {Event event = new Event();event.setMsg("Event number "+i);event.setSource(this.name);event.setDate(new Date());publisher.submit(event);int number = random.nextInt(10);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}}}
10.2.3 Consumer类
现在,在Consumer类中实现事件的消费者。这个类实现了采用Event类参数化的Flow.Subscriber接口,因此必须实现该接口提供的四种方法。
首先,声明两个属性。
name属性,用于存储消费者的名称。subscription属性,用于存储Flow.Subscription实例,该实例负责管理消费者与生产者之间的通信。
使用该类的构造函数初始化name属性,如以下代码片段所示:
public class Consumer implements Subscriber<Event> {private String name;private Subscription subscription;public Consumer (String name) {this.name = name;}
现在,实现Flow.Subscriber接口的四种方法。onComplete()方法和onError()方法只将信息显示到控制台。
@Overridepublic void onComplete() {this.showMessage("No more events");}@Overridepublic void onError(Throwable error) {this.showMessage("An error has ocurred");error.printStackTrace();}
当消费者希望订阅其通知时,SubmissionPublisher类将调用onSubscribe()方法,作为参数传递的Subscription对象将存放在subscription属性中,然后我们使用request()方法向发布者请求第一条消息。最后,在控制台输出消息。
@Overridepublic void onSubscribe(Subscription subscription) {this.subscription=subscription;this.subscription.request(1);this.showMessage("Subscription OK");}
最后,对于每个事件,SubmissionPublisher类都将调用onNext()方法。我们在控制台中显示该事件的信息,使用request()方法请求下一个事件,并且调用辅助方法proccesEvent()。
@Overridepublic void onNext(Event event) {this.showMessage("An event has arrived: "+event.getSource()+":"+event.getDate()+": "+event.getMsg());this.subscription.request(1);processEvent(event);}
使用processEvent()方法模拟消费者处理事件的时间。随机等待0到3秒以实现这一行为。
private void processEvent(Event event) {Random random = new Random();int number = random.nextInt(3);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}}
最后,必须实现上一个方法中使用的辅助方法showMessage()。它显示了参数中字符串的内容,其中含有执行消费者的线程的名称,以及消费者的名称。
private void showMessage (String txt) {System.out.println(Thread.currentThread().getName()+":"+this.name+":"+txt);}}
10.2.4 Main类
最后,实现Main类,其中含有创建并运行该示例所有组件的main()方法。
创建以下元素。
- 一个名为
publisher的SubmissionPublisher对象。我们将使用该对象将事件发送给消费者。 - 五个
Consumer对象,它们将接收发布者创建的所有事件。我们使用subscribe()方法向发布者订阅消费者。 - 两个
Producer对象,它们将生成事件,并使用publisher对象将事件发送给消费者。我们使用JVM提供的默认ForkJoinPool对象执行生产者对象,并使用commonPool()方法获取ForkJoinPool对象,并且使用submit()方法执行它们。
public class Main {public static void main(String[] args) {SubmissionPublisher<Event> publisher = new SubmissionPublisher();for (int i = 0; i < 5; i++) {Consumer consumer = new Consumer("Consumer "+i);publisher.subscribe(consumer);}Producer system1 = new Producer(publisher, "System 1");Producer system2 = new Producer(publisher, "System 2");ForkJoinTask<?>task1 = ForkJoinPool.commonPool().submit(system1);ForkJoinTask<?>task2 = ForkJoinPool.commonPool().submit(system2);
然后,给出一个while循环,该循环每10秒输出有关任务和发布者对象的信息,代码块如下:
do {System.out.println("Main: Task 1: "+task1.isDone());System.out.println("Main: Task 2: "+task2.isDone());System.out.println("Publisher: MaximunLag:"+publisher.estimateMaximumLag());System.out.println("Publisher: Max Buffer Capacity: "+publisher.getMaxBufferCapacity());try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}} while ((!task1.isDone()) || (!task2.isDone()) ||(publisher.estimateMaximumLag() > 0));
为了完成循环的执行,要等待三个条件。
- 执行第一个生产者对象的任务完成执行。
- 执行第二个生产者对象的任务完成执行。
SubmissionPublisher对象中再没有未处理事件。使用estimateMaximumLag()方法获取该数值。
最后,使用SubmissionPublisher对象的close()方法通知订阅者执行结束。
在本例的执行过程中,生产者使用submit()方法将事件发送给SubmissionPublisher,而SubmissionPublisher又将事件发送给不同的消费者。每个消费者都使用request()方法逐个请求事件。
下面的屏幕截图显示了该程序执行一次得到的部分输出。

可以看到main()方法如何输出有关任务和publisher对象的信息,用户如何接收不同的事件,以及最后main()方法调用SubmissionPublisher对象的close()方法时,如何输出由其调用的onComplete()方法所输出的消息。
