4.3 第二个例子:执行周期性任务

在前面含有执行器的例子中,各任务都被执行一次,而且都被尽快执行。执行器框架包括了其他一些执行器实现,这使我们在任务的执行时间上有了更多的灵活性。ScheduledThreadPoolExecutor类使我们可以周期性地执行任务,或者经过某一延时后执行任务。

本节将通过实现一个RSS订阅程序,促使你学会如何执行周期性任务。在这个简单的例子中,需要定期执行同一任务(阅读RSS订阅上的新闻)。我们的例子有如下几个特征。

  • 在文件中存储RSS源。我们从一些重要的报纸(例如《纽约时报》《每日新闻》《卫报》等)上选取了一些世界新闻。
  • 对每个RSS源,我们向执行器发送一个Runnable对象。每当执行器运行对象时,它会解析RSS源并且将其转换成一个含有RSS内容的CommonInformationItem对象列表。
  • 我们使用生产者/消费者设计模式将RSS新闻写入磁盘。生产者是执行器的任务,它们将每个CommonInformationItem写入到缓存中。缓存中仅存储新条目。消费者是一个独立线程,它从缓存中读取新闻并将其写入磁盘。

从任务执行结束到下一次执行的时间间隔是1分钟。

我们还实现了这个例子的高级版本,在该版本中,一个任务的两次执行之间的时间间隔是可变的。

4.3.1 公共部件

正如前面提过的,我们读取一个RSS订阅并将其转换成一个对象列表。为了解析该RSS文件,将其视为一个XML文件,而且在RSSDataCapturer类中实现了一个SAX(Simple API for XML的缩写)解析器。它可以解析该文件并且创建一个CommonInformationItem列表。该类为每个RSS项都存储了下述信息。

  • Title:RSS项的标题。
  • Date:RSS项的日期。
  • Link:RSS项的链接。
  • Description:RSS项的文本描述。
  • ID:RSS项的ID。如果该项并不含有ID,那么我们还可以计算其ID。
  • Source:RSS源的名称。

由于使用生产者/消费者设计模式将新闻存入磁盘,因此需要用缓存储存新闻,而且在本例中还需要一个Consumer类,该类可从缓存中读取新闻并将其写入磁盘。

我们在NewsBuffer类中实现了缓存,该类有两个内部属性。

  • LinkedBlockingQueue:这是一个带有阻塞操作的并发数据结构。如果从列表中获取某个项但是列表为空,那么调用方法的线程就会被阻塞,直到列表中有元素为止。我们使用这种结构存储CommonInformationItems
  • ConcurrentHashMap:这是HashMap的一个并发实现。它用来存储之前在缓存中存放的各新闻项的ID。

我们将只插入那些此前并未插入缓存中的新闻。

  1. public class NewsBuffer {
  2. private LinkedBlockingQueue<CommonInformationItem> buffer;
  3. private ConcurrentHashMap<String, String> storedItems;
  4. public NewsBuffer() {
  5. buffer=new LinkedBlockingQueue<>();
  6. storedItems=new ConcurrentHashMap<String, String>();
  7. }

我们在NewsBuffer类中给出了两种方法。其中一种方法用于将某一项存储到缓存,并预先检查该项此前是否已经插入;另一种方法用于从缓存中获取下一项。使用compute()方法将元素插入ConcurrentHashMap。该方法接收一个lambda表达式作为参数,其中含有键和与键相关联的实际值(如果该键没有相关联的值则为null)。在我们的例子中,将此前并未处理过的项加入缓存。我们使用add()方法和take()方法插入、获取和删除队列中的元素。

  1. public void add (CommonInformationItem item) {
  2. storedItems.compute(item.getId(), (id, oldSource) -> {
  3. if(oldSource == null) {
  4. buffer.add(item);
  5. return item.getSource();
  6. } else {
  7. System.out.println("Item "+item.getId()+" has been processed
  8. before");
  9. return oldSource;
  10. }
  11. });
  12. }
  13. public CommonInformationItem get() throws InterruptedException {
  14. return buffer.take();
  15. }

缓存的项可通过NewsWriter类写入磁盘,该类将作为一个独立线程执行。该类只有一个内部属性,该属性指向在应用程序中使用的NewsBuffer类。

  1. public class NewsWriter implements Runnable {
  2. private NewsBuffer buffer;
  3. public NewsWriter(NewsBuffer buffer) {
  4. this.buffer=buffer;
  5. }

Runnable对象的run()方法从缓存中获取CommonInformationItem实例并且将其保存到磁盘。与使用阻塞型方法一样,如果该缓存为空,则该线程将被阻塞,直到缓存中有元素为止。

  1. public void run() {
  2. try {
  3. while (!Thread.currentThread().interrupted()) {
  4. CommonInformationItem item=buffer.get();
  5. Path path=Paths.get ("output\\"+item.getFileName());
  6. try (BufferedWriter fileWriter = Files.newBufferedWriter
  7. (path, StandardOpenOption.CREATE)) {
  8. fileWriter.write(item.toString());
  9. } catch (IOException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. } catch (InterruptedException e) {
  14. // 正常执行
  15. }
  16. }

4.3.2 基础阅读器

该基础阅读器将使用标准的ScheduledThreadPoolExecutor类执行周期性任务。我们对每个RSS源都执行一个任务,而且从任务执行结束到下次执行开始间隔时间为一分钟。这些并发任务都是在NewsTask类中实现的,该类有三个内部属性,用于存储RSS订阅的名称、URL,以及用于存储新闻的NewsBuffer类。

  1. public class NewsTask implements Runnable {
  2. private String name;
  3. private String url;
  4. private NewsBuffer buffer;
  5. public NewsTask (String name, String url, NewsBuffer buffer) {
  6. this.name=name;
  7. this.url=url;
  8. this.buffer=buffer;
  9. }

Runnable对象的run()方法直接解析RSS订阅,获取一个CommonItemInterface实例列表,并将它们存储到缓存中。该方法将周期性执行。每次执行时,该run()方法都将从开始执行到结束。

  1. @Override
  2. public void run() {
  3. System.out.println(name + " : Running. " + new Date());
  4. RSSDataCapturer capturer = new RSSDataCapturer(name);
  5. List<CommonInformationItem> items=capturer.load(url);
  6. for (CommonInformationItem item: items) {
  7. buffer.add(item);
  8. }
  9. }

在本例中,还实现了另一个线程,以完成执行器和任务的初始化,然后等待执行结束。我们已将这个类命名为NewsSystem。该类有三个内部属性:用于存储含有RSS源的文件路径、用于存放新闻的缓存、以及一个控制其执行结束的CountDownLatch对象。CountDownLatch类是一种同步机制,允许存在一个线程等待某一事件。第11章将详细介绍该类的用途。我们有如下代码。

  1. public class NewsSystem implements Runnable {
  2. private String route;
  3. private ScheduledThreadPoolExecutor executor;
  4. private NewsBuffer buffer;
  5. private CountDownLatch latch=new CountDownLatch(1);
  6. public NewsSystem(String route) {
  7. this.route = route;
  8. executor = new ScheduledThreadPoolExecutor
  9. (Runtime.getRuntime().availableProcessors());
  10. buffer=new NewsBuffer();
  11. }

run()方法中,我们读取了所有的RSS源,为每个RSS源创建了一个NewsTask类,并且将它们发送给ScheduledThreadPool执行器。使用Executors类的newScheduledThreadPool()方法创建执行器,并使用scheduleAtFixedDelay()方法将任务发送给该执行器,也将NewsWriter实例作为一个线程启动。run()方法等待通知消息,在收到通知后采用CountDownLatch类的await()方法结束其执行,并且结束NewsWriter任务和ScheduledExecutor的执行。

  1. @Override
  2. public void run() {
  3. Path file = Paths.get(route);
  4. NewsWriter newsWriter=new NewsWriter(buffer);
  5. Thread t=new Thread(newsWriter);
  6. t.start();
  7. try (InputStream in = Files.newInputStream(file);
  8. BufferedReader reader = new BufferedReader(new
  9. InputStreamReader(in))) {
  10. String line = null;
  11. while ((line = reader.readLine()) != null) {
  12. String data[] = line.split(";");
  13. NewsTask task = new NewsTask(data[0], data[1], buffer);
  14. System.out.println("Task "+task.getName());
  15. executor.scheduleWithFixedDelay(task,0, 1,
  16. TimeUnit.MINUTES);
  17. }
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. synchronized (this) {
  22. try {
  23. latch.await();
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. System.out.println("Shutting down the executor.");
  29. executor.shutdown();
  30. t.interrupt();
  31. System.out.println("The system has finished.");
  32. }

我们还实现了shutdown()方法。该方法将告知NewsSystem类必须使用CountDownLatch类的countDown()方法停止其执行过程。该方法将会唤醒run()方法,这样就可以关闭正在运行NewsTask对象的执行器。

  1. public void shutdown() {
  2. latch.countDown();
  3. }

本例最后一个类是Main类,它实现了该例中的main()方法。该类将NewsSystem实例作为一个线程启动,等待10分钟后,通知线程结束执行,进而结束整个系统的执行,如下所示。

  1. public class Main {
  2. public static void main(String[] args) {
  3. // 创建system并将其作为一个线程执行
  4. NewsSystem system=new NewsSystem("data\\sources.txt");
  5. Thread t=new Thread(system);
  6. t.start();
  7. // 等待10分钟
  8. try {
  9. TimeUnit.MINUTES.sleep(10);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. // 通知system终止
  14. system.shutdown();
  15. }

执行本例时,可以看到各个任务如何以周期性方式执行,以及新闻项如何写入磁盘,如下图所示。

4.3 第二个例子:执行周期性任务 - 图1

4.3.3 高级阅读器

基础新闻阅读器是一个使用ScheduledThreadPoolExecutor类的例子,不过我们可以更深入。与使用ThreadPoolExecutor的情形一样,可以实现自己的ScheduledThreadPoolExecutor获得特定行为。在我们的例子中,希望周期性任务的延迟时间随着一天中的时刻而改变。在这部分,你将学到如何实现这一行为。

第一步是实现一个类,用于获取一个周期性任务两次执行之间的时延。我们将其命名为Timer类。该类只有一个名为getPeriod()的静态方法,该方法返回的是本次执行结束到下次执行开始之间的毫秒数。下面是实现过程,不过也可以自己编写代码。

  1. public class Timer {
  2. public static long getPeriod() {
  3. Calendar calendar = Calendar.getInstance();
  4. int hour = calendar.get(Calendar.HOUR_OF_DAY);
  5. if ((hour >= 6) && (hour <= 8)) {
  6. return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
  7. }
  8. if ((hour >= 13) && (hour <= 14)) {
  9. return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
  10. }
  11. if ((hour >= 20) && (hour <= 22)) {
  12. return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
  13. }
  14. return TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
  15. }
  16. }

接下来,还要实现执行器的内部任务。向执行器发送一个Runnable对象时,从外部来讲,可以将该对象视为并发任务,但是执行器会将该对象转换成另一个任务,即FutureTask类的一个实例,转换方法包括用于执行任务的run()方法和用于管理任务执行的Future接口中的方法。为了实现这个例子,必须实现一个类用以扩展FutureTask类,而且因为将在预定的执行器中执行这些任务,必须实现RunnableScheduledFuture接口。该接口提供了getDelay()方法,该方法返回了距离任务下一次执行所剩余的时间。我们已在ExecutorTask类中实现了这些内部任务,该类有如下四个内部属性。

  • ScheduledThreadPoolExecutor类创建的初始RunnableScheduledFuture内部任务。
  • 执行该任务的预定执行器。
  • 该任务下一次执行的起始时间。
  • RSS订阅的名称。

代码如下所示。

  1. public class ExecutorTask<V> extends FutureTask<V> implements
  2. RunnableScheduledFuture<V> {
  3. private RunnableScheduledFuture<V> task;
  4. private NewsExecutor executor;
  5. private long startDate;
  6. private String name;
  7. public ExecutorTask(Runnable runnable, V result,
  8. RunnableScheduledFuture<V> task,
  9. NewsExecutor executor) {
  10. super(runnable, result);
  11. this.task = task;
  12. this.executor = executor;
  13. this.name=((NewsTask)runnable).getName();
  14. this.startDate=new Date().getTime();
  15. }

我们在该类中重载或者实现了不同的方法。第一个是getDelay()方法,如前所述,该方法在给定的时间单位内返回距离任务下次执行的剩余时间。

  1. @Override
  2. public long getDelay(TimeUnit unit) {
  3. long delay;
  4. if (!isPeriodic()) {
  5. delay = task.getDelay(unit);
  6. } else {
  7. if (startDate == 0) {
  8. delay = task.getDelay(unit);
  9. } else {
  10. Date now = new Date();
  11. delay = startDate - now.getTime();
  12. delay = unit.convert(delay, TimeUnit.MILLISECONDS);
  13. }
  14. }
  15. return delay;
  16. }

接下来是用于对比两个任务的compareTo()方法,主要考量这两个任务下次执行的起始时间。

  1. @Override
  2. public int compareTo(Delayed object) {
  3. return Long.compare(this.getStartDate(),
  4. ((ExecutorTask<V>)object).getStartDate());
  5. }

然后,如果任务是周期性的,则isPeriodic()方法返回true,否则返回false

  1. @Override
  2. public boolean isPeriodic() {
  3. return task.isPeriodic();
  4. }

最后,在run()方法中实现了本例最重要的部分。首先,调用FutureTask类的runAndReset()方法。该方法执行任务并且重置其状态,这样任务就可以再次执行。然后,使用Timer类计算下次执行的起始时间。最后,还要在ScheduledThreadPoolExecutor类的队列中再次插入该任务。如果不做最后一步,那么任务就不会像如下所示这样再次执行。

  1. @Override
  2. public void run() {
  3. if (isPeriodic() && (!executor.isShutdown())) {
  4. super.runAndReset();
  5. Date now=new Date();
  6. startDate=now.getTime()+Timer.getPeriod();
  7. executor.getQueue().add(this);
  8. System.out.println("Start Date: "+new Date(startDate));
  9. }
  10. }

一旦有了面向执行器的任务后,则必须实现执行器。我们实现了NewsExecutor类,用以扩展ScheduledThreadPoolExecutor类。我们重载了decorateTask()方法,有了该方法,就可以替换预定执行器使用的内部任务。默认情况下,该方法返回RunnableScheduledFuture接口的默认实现,但是在我们的例子中,它将返回Executor扩展类的一个实例。

  1. public class NewsExecutor extends ScheduledThreadPoolExecutor {
  2. public NewsExecutor(int corePoolSize) {
  3. super(corePoolSize);
  4. }
  5. @Override
  6. protected <V> RunnableScheduledFuture<V> decorateTask(Runnable
  7. runnable, RunnableScheduledFuture<V> task) {
  8. ExecutorTask<V> myTask = new ExecutorTask<>(runnable, null,
  9. task, this);
  10. return myTask;
  11. }
  12. }

为了实现NewsSystem的其他版本,以及使用NewsExecutorMain类,我们实现了NewsAdvancedSystemAdvancedMain

现在,你可以运行高级新闻系统,查看各次执行之间延迟时间的变化。