4.3 第二个例子:执行周期性任务
在前面含有执行器的例子中,各任务都被执行一次,而且都被尽快执行。执行器框架包括了其他一些执行器实现,这使我们在任务的执行时间上有了更多的灵活性。ScheduledThreadPoolExecutor类使我们可以周期性地执行任务,或者经过某一延时后执行任务。
本节将通过实现一个RSS订阅程序,促使你学会如何执行周期性任务。在这个简单的例子中,需要定期执行同一任务(阅读RSS订阅上的新闻)。我们的例子有如下几个特征。
- 在文件中存储RSS源。我们从一些重要的报纸(例如《纽约时报》《每日新闻》《卫报》等)上选取了一些世界新闻。
- 对每个RSS源,我们向执行器发送一个
Runnable对象。每当执行器运行对象时,它会解析RSS源并且将其转换成一个含有RSS内容的CommonInformationItem对象列表。 - 我们使用生产者/消费者设计模式将RSS新闻写入磁盘。生产者是执行器的任务,它们将每个
CommonInformationItem写入到缓存中。缓存中仅存储新条目。消费者是一个独立线程,它从缓存中读取新闻并将其写入磁盘。
从任务执行结束到下一次执行的时间间隔是1分钟。
我们还实现了这个例子的高级版本,在该版本中,一个任务的两次执行之间的时间间隔是可变的。
4.3.1 公共部件
正如前面提过的,我们读取一个RSS订阅并将其转换成一个对象列表。为了解析该RSS文件,将其视为一个XML文件,而且在RSSDataCapturer类中实现了一个SAX(Simple API for XML的缩写)解析器。它可以解析该文件并且创建一个CommonInformationItem列表。该类为每个RSS项都存储了下述信息。
- Title:RSS项的标题。
- Date:RSS项的日期。
- Link:RSS项的链接。
- Description:RSS项的文本描述。
- ID:RSS项的ID。如果该项并不含有ID,那么我们还可以计算其ID。
- Source:RSS源的名称。
由于使用生产者/消费者设计模式将新闻存入磁盘,因此需要用缓存储存新闻,而且在本例中还需要一个Consumer类,该类可从缓存中读取新闻并将其写入磁盘。
我们在NewsBuffer类中实现了缓存,该类有两个内部属性。
LinkedBlockingQueue:这是一个带有阻塞操作的并发数据结构。如果从列表中获取某个项但是列表为空,那么调用方法的线程就会被阻塞,直到列表中有元素为止。我们使用这种结构存储CommonInformationItems。ConcurrentHashMap:这是HashMap的一个并发实现。它用来存储之前在缓存中存放的各新闻项的ID。
我们将只插入那些此前并未插入缓存中的新闻。
public class NewsBuffer {private LinkedBlockingQueue<CommonInformationItem> buffer;private ConcurrentHashMap<String, String> storedItems;public NewsBuffer() {buffer=new LinkedBlockingQueue<>();storedItems=new ConcurrentHashMap<String, String>();}
我们在NewsBuffer类中给出了两种方法。其中一种方法用于将某一项存储到缓存,并预先检查该项此前是否已经插入;另一种方法用于从缓存中获取下一项。使用compute()方法将元素插入ConcurrentHashMap。该方法接收一个lambda表达式作为参数,其中含有键和与键相关联的实际值(如果该键没有相关联的值则为null)。在我们的例子中,将此前并未处理过的项加入缓存。我们使用add()方法和take()方法插入、获取和删除队列中的元素。
public void add (CommonInformationItem item) {storedItems.compute(item.getId(), (id, oldSource) -> {if(oldSource == null) {buffer.add(item);return item.getSource();} else {System.out.println("Item "+item.getId()+" has been processedbefore");return oldSource;}});}public CommonInformationItem get() throws InterruptedException {return buffer.take();}
缓存的项可通过NewsWriter类写入磁盘,该类将作为一个独立线程执行。该类只有一个内部属性,该属性指向在应用程序中使用的NewsBuffer类。
public class NewsWriter implements Runnable {private NewsBuffer buffer;public NewsWriter(NewsBuffer buffer) {this.buffer=buffer;}
该Runnable对象的run()方法从缓存中获取CommonInformationItem实例并且将其保存到磁盘。与使用阻塞型方法一样,如果该缓存为空,则该线程将被阻塞,直到缓存中有元素为止。
public void run() {try {while (!Thread.currentThread().interrupted()) {CommonInformationItem item=buffer.get();Path path=Paths.get ("output\\"+item.getFileName());try (BufferedWriter fileWriter = Files.newBufferedWriter(path, StandardOpenOption.CREATE)) {fileWriter.write(item.toString());} catch (IOException e) {e.printStackTrace();}}} catch (InterruptedException e) {// 正常执行}}
4.3.2 基础阅读器
该基础阅读器将使用标准的ScheduledThreadPoolExecutor类执行周期性任务。我们对每个RSS源都执行一个任务,而且从任务执行结束到下次执行开始间隔时间为一分钟。这些并发任务都是在NewsTask类中实现的,该类有三个内部属性,用于存储RSS订阅的名称、URL,以及用于存储新闻的NewsBuffer类。
public class NewsTask implements Runnable {private String name;private String url;private NewsBuffer buffer;public NewsTask (String name, String url, NewsBuffer buffer) {this.name=name;this.url=url;this.buffer=buffer;}
该Runnable对象的run()方法直接解析RSS订阅,获取一个CommonItemInterface实例列表,并将它们存储到缓存中。该方法将周期性执行。每次执行时,该run()方法都将从开始执行到结束。
@Overridepublic void run() {System.out.println(name + " : Running. " + new Date());RSSDataCapturer capturer = new RSSDataCapturer(name);List<CommonInformationItem> items=capturer.load(url);for (CommonInformationItem item: items) {buffer.add(item);}}
在本例中,还实现了另一个线程,以完成执行器和任务的初始化,然后等待执行结束。我们已将这个类命名为NewsSystem。该类有三个内部属性:用于存储含有RSS源的文件路径、用于存放新闻的缓存、以及一个控制其执行结束的CountDownLatch对象。CountDownLatch类是一种同步机制,允许存在一个线程等待某一事件。第11章将详细介绍该类的用途。我们有如下代码。
public class NewsSystem implements Runnable {private String route;private ScheduledThreadPoolExecutor executor;private NewsBuffer buffer;private CountDownLatch latch=new CountDownLatch(1);public NewsSystem(String route) {this.route = route;executor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());buffer=new NewsBuffer();}
在run()方法中,我们读取了所有的RSS源,为每个RSS源创建了一个NewsTask类,并且将它们发送给ScheduledThreadPool执行器。使用Executors类的newScheduledThreadPool()方法创建执行器,并使用scheduleAtFixedDelay()方法将任务发送给该执行器,也将NewsWriter实例作为一个线程启动。run()方法等待通知消息,在收到通知后采用CountDownLatch类的await()方法结束其执行,并且结束NewsWriter任务和ScheduledExecutor的执行。
@Overridepublic void run() {Path file = Paths.get(route);NewsWriter newsWriter=new NewsWriter(buffer);Thread t=new Thread(newsWriter);t.start();try (InputStream in = Files.newInputStream(file);BufferedReader reader = new BufferedReader(newInputStreamReader(in))) {String line = null;while ((line = reader.readLine()) != null) {String data[] = line.split(";");NewsTask task = new NewsTask(data[0], data[1], buffer);System.out.println("Task "+task.getName());executor.scheduleWithFixedDelay(task,0, 1,TimeUnit.MINUTES);}} catch (Exception e) {e.printStackTrace();}synchronized (this) {try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("Shutting down the executor.");executor.shutdown();t.interrupt();System.out.println("The system has finished.");}
我们还实现了shutdown()方法。该方法将告知NewsSystem类必须使用CountDownLatch类的countDown()方法停止其执行过程。该方法将会唤醒run()方法,这样就可以关闭正在运行NewsTask对象的执行器。
public void shutdown() {latch.countDown();}
本例最后一个类是Main类,它实现了该例中的main()方法。该类将NewsSystem实例作为一个线程启动,等待10分钟后,通知线程结束执行,进而结束整个系统的执行,如下所示。
public class Main {public static void main(String[] args) {// 创建system并将其作为一个线程执行NewsSystem system=new NewsSystem("data\\sources.txt");Thread t=new Thread(system);t.start();// 等待10分钟try {TimeUnit.MINUTES.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}// 通知system终止system.shutdown();}
执行本例时,可以看到各个任务如何以周期性方式执行,以及新闻项如何写入磁盘,如下图所示。

4.3.3 高级阅读器
基础新闻阅读器是一个使用ScheduledThreadPoolExecutor类的例子,不过我们可以更深入。与使用ThreadPoolExecutor的情形一样,可以实现自己的ScheduledThreadPoolExecutor获得特定行为。在我们的例子中,希望周期性任务的延迟时间随着一天中的时刻而改变。在这部分,你将学到如何实现这一行为。
第一步是实现一个类,用于获取一个周期性任务两次执行之间的时延。我们将其命名为Timer类。该类只有一个名为getPeriod()的静态方法,该方法返回的是本次执行结束到下次执行开始之间的毫秒数。下面是实现过程,不过也可以自己编写代码。
public class Timer {public static long getPeriod() {Calendar calendar = Calendar.getInstance();int hour = calendar.get(Calendar.HOUR_OF_DAY);if ((hour >= 6) && (hour <= 8)) {return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);}if ((hour >= 13) && (hour <= 14)) {return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);}if ((hour >= 20) && (hour <= 22)) {return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);}return TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);}}
接下来,还要实现执行器的内部任务。向执行器发送一个Runnable对象时,从外部来讲,可以将该对象视为并发任务,但是执行器会将该对象转换成另一个任务,即FutureTask类的一个实例,转换方法包括用于执行任务的run()方法和用于管理任务执行的Future接口中的方法。为了实现这个例子,必须实现一个类用以扩展FutureTask类,而且因为将在预定的执行器中执行这些任务,必须实现RunnableScheduledFuture接口。该接口提供了getDelay()方法,该方法返回了距离任务下一次执行所剩余的时间。我们已在ExecutorTask类中实现了这些内部任务,该类有如下四个内部属性。
- 由
ScheduledThreadPoolExecutor类创建的初始RunnableScheduledFuture内部任务。 - 执行该任务的预定执行器。
- 该任务下一次执行的起始时间。
- RSS订阅的名称。
代码如下所示。
public class ExecutorTask<V> extends FutureTask<V> implementsRunnableScheduledFuture<V> {private RunnableScheduledFuture<V> task;private NewsExecutor executor;private long startDate;private String name;public ExecutorTask(Runnable runnable, V result,RunnableScheduledFuture<V> task,NewsExecutor executor) {super(runnable, result);this.task = task;this.executor = executor;this.name=((NewsTask)runnable).getName();this.startDate=new Date().getTime();}
我们在该类中重载或者实现了不同的方法。第一个是getDelay()方法,如前所述,该方法在给定的时间单位内返回距离任务下次执行的剩余时间。
@Overridepublic long getDelay(TimeUnit unit) {long delay;if (!isPeriodic()) {delay = task.getDelay(unit);} else {if (startDate == 0) {delay = task.getDelay(unit);} else {Date now = new Date();delay = startDate - now.getTime();delay = unit.convert(delay, TimeUnit.MILLISECONDS);}}return delay;}
接下来是用于对比两个任务的compareTo()方法,主要考量这两个任务下次执行的起始时间。
@Overridepublic int compareTo(Delayed object) {return Long.compare(this.getStartDate(),((ExecutorTask<V>)object).getStartDate());}
然后,如果任务是周期性的,则isPeriodic()方法返回true,否则返回false。
@Overridepublic boolean isPeriodic() {return task.isPeriodic();}
最后,在run()方法中实现了本例最重要的部分。首先,调用FutureTask类的runAndReset()方法。该方法执行任务并且重置其状态,这样任务就可以再次执行。然后,使用Timer类计算下次执行的起始时间。最后,还要在ScheduledThreadPoolExecutor类的队列中再次插入该任务。如果不做最后一步,那么任务就不会像如下所示这样再次执行。
@Overridepublic void run() {if (isPeriodic() && (!executor.isShutdown())) {super.runAndReset();Date now=new Date();startDate=now.getTime()+Timer.getPeriod();executor.getQueue().add(this);System.out.println("Start Date: "+new Date(startDate));}}
一旦有了面向执行器的任务后,则必须实现执行器。我们实现了NewsExecutor类,用以扩展ScheduledThreadPoolExecutor类。我们重载了decorateTask()方法,有了该方法,就可以替换预定执行器使用的内部任务。默认情况下,该方法返回RunnableScheduledFuture接口的默认实现,但是在我们的例子中,它将返回Executor扩展类的一个实例。
public class NewsExecutor extends ScheduledThreadPoolExecutor {public NewsExecutor(int corePoolSize) {super(corePoolSize);}@Overrideprotected <V> RunnableScheduledFuture<V> decorateTask(Runnablerunnable, RunnableScheduledFuture<V> task) {ExecutorTask<V> myTask = new ExecutorTask<>(runnable, null,task, this);return myTask;}}
为了实现NewsSystem的其他版本,以及使用NewsExecutor的Main类,我们实现了NewsAdvancedSystem和AdvancedMain。
现在,你可以运行高级新闻系统,查看各次执行之间延迟时间的变化。
