5.3 第二个例子:为文档集创建倒排索引

信息检索领域,倒排索引是一种常见的数据结构,用于加快在文档集中查找文本的速度。它存储了文档集的所有单词,以及一个包含这些单词的文档列表。

为构建该索引,我们要解析文档集中的所有文档,并且以增量方式构建索引。对于每个文档来说,我们抽取该文档中的重要单词(删除最常见单词,也叫作停止词,或者也可能应用词干提取算法),并且之后将那些单词加入到索引中。如果一个文档中的某个单词存在于索引之中,就将该文档加入到与该单词相关联的文档列表中。如果文档中的某个单词并不存在于索引之中,那么将该单词加入到索引的单词列表中,并且将该文档与该单词关联起来。可以为这种关联关系加入一些参数,例如文档中单词的“术语频次”,以便提供更多的信息。

当你搜索文档集合中的一个单词或者单词列表时,使用倒排索引来获取与每个单词相关的文档列表,并创建含有搜索结果的一个唯一列表。

本节,你将学会如何使用Java并发程序来为一个文档集构建一个倒排索引文件。至于文档集,我们选用维基百科(Wikipedia)上有关电影信息的页面来构建一个含有100 673个文档的集合。我们将每一个维基百科页面转换成一个文本文件,你可以随本书配套源码一起下载该文档集。

为了构建倒排索引,我们不会删除任何单词,也不会使用任何词干提取算法。我们希望使算法尽可能简单,以便将精力集中于并发程序上。

这里提到的原理同样也可以用于获取有关文档集合的其他信息,例如每个文档的向量表示可用作聚类算法的输入,你将在第7章中学习这些内容。

和其他示例一样,你将实现这些操作的串行版和并发版,以验证在该例中并发处理对我们的帮助。

5.3.1 公共类

串行版和并发版在实现将文档集合加载到Java对象时要用到一些共同的类。我们用到了下面两个类。

  • Document类,用于存放文档中所含单词的列表。
  • DocumentParse类,用于将一个以文件存储的文档转换成一个文档对象。

让我们分析一下这两个类的源代码。

  • Document

Document类非常简单,它只有两个属性以及用于获取和设置属性值的方法。这两个属性如下。

  • 文件名,这是一个字符串。
  • 词汇表(也就是在文档中用到的单词的列表),这是一个HashMap。其键为单词,其值为该单词在文档中出现的次数。
    • DocumentParser

正如前面提到的,该类将以文件存储的文档转换为以Document对象表示的文档。它将单词划分为三个方法。第一个是parse()方法,它接收文件路径作为参数,并且返回一个带有该文档词汇表的HashMap。该方法使用Files类的readAllLines()方法逐行读取文件,并使用parseLine()方法将每一行转换成一个单词列表,并且将其添加到词汇表中,如下所示。

  1. public class DocumentParser {
  2. public Map<String, Integer> parse(String route) {
  3. Map<String, Integer> ret=new HashMap<String,Integer>();
  4. Path file=Paths.get(route);
  5. try {
  6. List<String> lines = Files.readAllLines(file);
  7. for (String line : lines) {
  8. parseLine(line,ret);
  9. }
  10. } catch (IOException e) {
  11. e.printStackTrace();
  12. }
  13. return ret;
  14. }

parseLine()方法处理当前行并抽取其中的单词。为使本例更加简单,我们将单词看作一个字母字符序列。我们使用Pattern类来抽取单词,使用Normalizer类将单词转换成小写形式,并且删除元音的重音符号,如下所示:

  1. private static final Pattern PATTERN = Pattern.compile
  2. ("\\P{IsAlphabetic}+");
  3. private void parseLine(String line, Map<String, Integer> ret) {
  4. for(String word: PATTERN.split(line)) {
  5. if(!word.isEmpty())
  6. ret.merge(Normalizer.normalize(word, Normalizer.Form.NFKD)
  7. .toLowerCase(), 1, (a, b) -> a+b);
  8. }
  9. }

5.3.2 串行版本

本例的串行版本在SerialIndexing类中实现。该类含有main()方法,可以读取所有文档、获取其词汇表,并且以增量方式构建倒排索引。

首先,我们初始化必要的变量。文档集存放在目录data中,因此我们用一个File对象数组来存储所有的文档。我们还初始化了invertedIndex对象。在此用到了一个HashMap,它的键为单词,而它的值是一个字符串对象列表,这些字符串表示的是含有该单词的文件的名称,如下所示:

  1. public class SerialIndexing {
  2. public static void main(String[] args) {
  3. Date start, end;
  4. File source = new File("data");
  5. File[] files = source.listFiles();
  6. Map<String, List<String>> invertedIndex=new
  7. HashMap<String,List<String>> ();

然后,我们使用DocumentParse类来解析所有文档,并且使用updateInvertedIndex()方法将从各个文档获取的词汇表添加到倒排索引中。我们还测量了所有处理过程的执行时间,代码如下所示:

  1. start=new Date();
  2. for (File file : files) {
  3. DocumentParser parser = new DocumentParser();
  4. if (file.getName().endsWith(".txt")) {
  5. Map<String, Integer> voc = parser.parse(file.getAbsolutePath());
  6. updateInvertedIndex(voc,invertedIndex, file.getName());
  7. }
  8. }
  9. end=new Date();

最后,我们在控制台中显示执行结果。

  1. System.out.println("Execution Time: "+(end.getTime()-
  2. start.getTime()));
  3. System.out.println("invertedIndex: "+invertedIndex.size());
  4. }

updateInvertedIndex()方法将一个文档的词汇表添加到倒排索引结构中。它处理所有构成词汇表的单词。如果单词已经存在于倒排索引之中,我们将文档名称添加到与该单词相关联的文档列表之中。如果单词并不存在于倒排索引之中,就将单词加入倒排索引并且将文档与该单词关联起来,如下所示:

  1. private static void updateInvertedIndex(Map<String, Integer> voc,
  2. Map<String, List<String>> invertedIndex, String fileName) {
  3. for (String word : voc.keySet()) {
  4. if (word.length() >= 3) {
  5. invertedIndex.computeIfAbsent(word, k -> new
  6. ArrayList<>()).add(fileName);
  7. }
  8. }
  9. }

5.3.3 第一个并发版本:每个文档一个任务

现在,是实现并发版文本索引算法的时候了。显然,我们可以并行化每个文档的处理。其中包括从文件读取文档和逐行处理以获取文档词汇表。各任务可返回词汇表作为结果,因此我们可以基于Callable接口来实现任务。

在前面的示例中,我们用了三个方法来将Callable任务发送给执行器。

  • submit()
  • invokeAll()
  • invokeAny()

我们要处理所有的文档,因此必须放弃invokeAny()方法。可其他两个方法又很不方便。如果我们使用submit()方法,就必须确定在何时处理任务的结果。如果为每个文档都发送一个任务,就可以以如下方式处理结果。

  • 在发送每个任务后,显然这是不现实的。
  • 在所有任务完成后,这样我们就需要存储大量Future对象。
  • 在发送一组任务后,我们需要编写代码来同步两个操作。

这些方法都有一个问题:我们以顺序方式来处理这些任务的结果。如果使用invokeAll()方法,所处的情形就与第二点相似,我们必须等所有任务都结束。

一个可行的供选方案是创建其他一些任务来处理与每个任务相关的Future对象,而Java并发API提供了一种很好的解决方案,采用CompletionService接口及其实现(即ExecutorCompletionService类)来实现这一解决方案。

CompletionService对象带有一个执行器,它允许你将任务生成和那些任务结果的使用分离开来。你可以使用submit()方法向执行器发送任务,并在这些任务执行完毕后使用poll()或者take()方法来获取其结果。因此,就我们的解决方案而言,将实现下述要素。

  • 一个用于执行任务的CompletionService对象。
  • 为每个文档分配一个任务以解析文档并且生成其词汇表,而该任务将由CompletionService对象来执行。这些任务都在IndexingTask类中实现。
  • 创建两个线程来处理任务结果并且构造倒排索引。这些线程都在InvertedIndexTask类中实现。
  • 一个用于创建和执行所有要素的main()方法。该方法在ConcurrentIndexingMain类中实现。

让我们来分析一下这些类的源代码。

  • IndexingTask

该类实现的任务是解析一个文档来获取其词汇表。该类实现了用Document类参数化的Callable接口。它有一个存储File对象的内部属性,而该File对象代表了它要解析的文档。请看下面的代码:

  1. public class IndexingTask implements Callable<Document> {
  2. private File file;
  3. public IndexingTask(File file) {
  4. this.file=file;
  5. }

call()方法中,直接使用了DocumentParser类的parse()方法来解析文档,获得词汇表,并且根据获得的数据创建和返回Document对象。

  1. @Override
  2. public Document call() throws Exception {
  3. DocumentParser parser = new DocumentParser();
  4. Map<String, Integer> voc = parser.parse(file.getAbsolutePath());
  5. Document document=new Document();
  6. document.setFileName(file.getName());
  7. document.setVoc(voc);
  8. return document;
  9. }
  10. }
  • InvertedIndexTask

该类实现的任务是获取由IndexingTask对象生成的Document对象,并且创建倒排索引。该任务将作为Thread对象来执行(我们在本例中没有使用执行器),因此它是基于Runnable接口的。

InvertedIndexTask类用到了下述三个内部属性。

  • Document类参数化的CompletionService对象,用于访问由IndexingTask对象返回的对象。
  • 用于存储倒排索引的ConcurrentHashMap,其键为单词,而值为一个存放文件名字符串的ConcurrentLinkedDeque。在本例中,我们要使用并发数据结构,而在串行版本中使用的数据结构是没有同步机制的。
  • 一个用于表明任务能够完成其工作的布尔值。
    相关代码如下所示:
  1. public class InvertedIndexTask implements Runnable {
  2. private CompletionService<Document> completionService;
  3. private ConcurrentHashMap<String,
  4. ConcurrentLinkedDeque<String>> invertedIndex;
  5. public InvertedIndexTask(CompletionService<Document>
  6. completionService, ConcurrentHashMap<String,
  7. ConcurrentLinkedDeque<String>> invertedIndex) {
  8. this.completionService = completionService;
  9. this.invertedIndex = invertedIndex;
  10. }

run()方法使用来自CompletionService类的take()方法获取与某一任务相关联的Future对象。我们实现了一个循环,在线程中断之前该循环将一直运行。当该线程中断之后,它会再次使用poll()方法处理所有待处理的Future对象。我们使用updateInvertedIndex()方法以及take()方法返回的对象来更新倒排索引,方法如下所示:

  1. public void run() {
  2. try {
  3. while (!Thread.interrupted()) {
  4. try {
  5. Document document = completionService.take().get();
  6. updateInvertedIndex(document.getVoc(), invertedIndex,
  7. document.getFileName());
  8. } catch (InterruptedException e) {
  9. break;
  10. }
  11. }
  12. while (true) {
  13. Future<Document> future = completionService.poll();
  14. if (future == null)
  15. break;
  16. Document document = future.get();
  17. updateInvertedIndex(document.getVoc(), invertedIndex,
  18. document.getFileName());
  19. }
  20. } catch (InterruptedException | ExecutionException e) {
  21. e.printStackTrace();
  22. }
  23. }

最后,updateInvertedIndex方法将从文档获得的词汇表、倒排索引和文件名作为参数处理。该方法处理词汇表的所有单词。如果单词没有在索引中出现,我们使用computeIfAbsent()方法将其添加到invertedIndex中。

  1. private void updateInvertedIndex(Map<String, Integer> voc,
  2. ConcurrentHashMap<String, ConcurrentLinkedDeque<String>>
  3. invertedIndex, String fileName) {
  4. for (String word : voc.keySet()) {
  5. if (word.length() >= 3) {
  6. invertedIndex.computeIfAbsent(word, k -> new
  7. ConcurrentLinkedDeque<>()).add(fileName);
  8. }
  9. }
  10. }
  • ConcurrentIndexing

这是本例的主类。该类创建并启动了所有组件,等待执行过程结束,并且在控制台输出最终执行时间。

首先,它要创建并初始化执行过程中所需的所有变量。

  • 运行InvertedTask任务的执行器。和前面的例子一样,我们使用机器的核心数作为执行器中的最大工作线程数。不过在本例中,我们预留了一个核来执行独立线程。
  • 用于运行任务的CompletionService对象。我们使用此前创建的执行器来初始化该对象。
  • 用于存储倒排索引的ConcurrentHashMap
  • 一个含有所有待处理文档的File对象数组。
    相关方法如下所示:
  1. public class ConcurrentIndexing {
  2. public static void main(String[] args) {
  3. int numCores=Runtime.getRuntime().availableProcessors();
  4. ThreadPoolExecutor executor=(ThreadPoolExecutor)
  5. Executors.newFixedThreadPool(Math.max(numCores-1, 1));
  6. ExecutorCompletionService<Document> completionService=new
  7. ExecutorCompletionService<>(executor);
  8. ConcurrentHashMap<String, ConcurrentLinkedDeque<String>>
  9. invertedIndex=new ConcurrentHashMap
  10. <String,ConcurrentLinkedDeque<String>> ();
  11. Date start, end;
  12. File source = new File("data");
  13. File[] files = source.listFiles();

然后,处理数组中的所有文件,为每个文件创建一个InvertedTask对象,并且使用submit()方法将其发送给CompletionService类。我们已经介绍了一种避免执行器过载的方法。我们可以检查待处理任务队列的规模,如果该队列的规模大于1000,就将该线程休眠,队列规模不再减小之时,我们就不再发送更多任务了。

  1. start=new Date();
  2. for (File file : files) {
  3. IndexingTask task=new IndexingTask(file);
  4. completionService.submit(task);
  5. if (executor.getQueue().size()>1000) {
  6. do {
  7. try {
  8. TimeUnit.MILLISECONDS.sleep(50);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. } while (executor.getQueue().size()>1000);
  13. }
  14. }

然后,创建两个InvertedIndexTask对象来处理由InvertedTask任务返回的结果,并且将其作为常规Thread对象来执行。

  1. InvertedIndexTask invertedIndexTask=new InvertedIndexTask
  2. (completionService,invertedIndex);
  3. Thread thread1=new Thread(invertedIndexTask);
  4. thread1.start();
  5. InvertedIndexTask invertedIndexTask2=new InvertedIndexTask
  6. (completionService,invertedIndex);
  7. Thread thread2=new Thread(invertedIndexTask2);
  8. thread2.start();

启动所有要素之后,可使用shutdown()方法和awaitTermination()方法等待执行器结束。awaitTermination()方法将在所有InvertedTask任务执行完毕后返回,这样我们就可以结束执行InvertedIndexTask任务的线程了。要做到这一点,我们需要中断这些线程(参看有关InvertedIndexTask的注释),如下面的代码片段所示:

  1. executor.shutdown();
  2. try {
  3. executor.awaitTermination(1, TimeUnit.DAYS);
  4. thread1.interrupt();
  5. thread2.interrupt();
  6. thread1.join();
  7. thread2.join();
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }

最后,我们在控制台输出倒排索引的大小以及所有处理过程的执行时间:

  1. end=new Date();
  2. System.out.println("Execution Time: "+(end.getTime()-
  3. start.getTime()));
  4. System.out.println("invertedIndex: "+invertedIndex.size());
  5. }
  6. }

5.3.4 第二个并发版本:每个任务多个文档

我们还实现了本例的第二个并发版本。基本原理与第一个版本的相同,但是在本例中,每个任务将处理多个文档而不是仅处理一个文档。每个任务处理的文档数将作为main()方法的一个输入参数。我们测试了每个任务处理100、1000和5000个文档的结果。

为实现这一新方式,需要实现下述三个新类。

  • MultipleIndexingTask类:该类与IndexingTask类相当,但是它处理的是一个文档列表,而不仅仅是一个文档。
  • MultipleInvertedIndexTask类:该类与InvertedIndexTask类相当,只不过现在任务要检索的是一个Document对象列表,而不仅仅是一个Document对象。
  • MultipleConcurrentIndexing类:该类与ConcurrentIndexing类相当,只不过它还用到了其他新类。

鉴于这一版本的源代码和前一版本多有相似,我们仅给出其中的不同点。

  • MultipleIndexingTask

正如前面提到的,该类和此前介绍的IndexingTask类很类似。主要区别在于它使用的是一个File对象列表,而不仅仅是一个文件。

  1. public class MultipleIndexingTask implements Callable<List<Document>> {
  2. private List<File> files;
  3. public MultipleIndexingTask(List<File> files) {
  4. this.files = files;
  5. }

call()方法返回的是一个Document对象列表,而不仅仅是一个Document对象。

  1. @Override
  2. public List<Document> call() throws Exception {
  3. List<Document> documents = new ArrayList<Document>();
  4. for (File file : files) {
  5. DocumentParser parser = new DocumentParser();
  6. Hashtable<String, Integer> voc = parser.parse
  7. (file.getAbsolutePath());
  8. Document document = new Document();
  9. document.setFileName(file.getName());
  10. document.setVoc(voc);
  11. documents.add(document);
  12. }
  13. return documents;
  14. }
  15. }
  • MultipleInvertedIndexTask

正如前面提到的,该类和前面介绍的InvertedIndexClass类相似。主要区别在于run()方法。poll()方法返回的Future对象返回了一个Document对象列表,因此我们要处理的是整个列表。请看如下代码片段:

  1. @Override
  2. public void run() {
  3. try {
  4. while (!Thread.interrupted()) {
  5. try {
  6. List<Document> documents = completionService.take().get();
  7. for (Document document : documents) {
  8. updateInvertedIndex(document.getVoc(), invertedIndex,
  9. document.getFileName());
  10. }
  11. } catch (InterruptedException e) {
  12. break;
  13. }
  14. }
  15. while (true) {
  16. Future<List<Document>> future = completionService.poll();
  17. if (future == null)
  18. break;
  19. List<Document> documents = future.get();
  20. for (Document document : documents) {
  21. updateInvertedIndex(document.getVoc(), invertedIndex,
  22. document.getFileName());
  23. }
  24. }
  25. } catch (InterruptedException | ExecutionException e) {
  26. e.printStackTrace();
  27. }
  28. }
  • MultipleConcurrentIndexing

正如前面提到的,该类和ConcurrentIndexing类很相似。唯一不同的是利用新类,并且使用第一个参数来决定每个任务所处理的文档数量。我们有如下方法:

  1. start=new Date();
  2. List<File> taskFiles=new ArrayList<>();
  3. for (File file : files) {
  4. taskFiles.add(file);
  5. if (taskFiles.size()==NUMBER_OF_TASKS) {
  6. MultipleIndexingTask task=new MultipleIndexingTask(taskFiles);
  7. completionService.submit(task);
  8. taskFiles=new ArrayList<>();
  9. if (executor.getQueue().size()>10) {
  10. do {
  11. try {
  12. TimeUnit.MILLISECONDS.sleep(50);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. } while (executor.getQueue().size()>10);
  17. }
  18. }
  19. }
  20. if (taskFiles.size()>0) {
  21. MultipleIndexingTask task=new MultipleIndexingTask(taskFiles);
  22. completionService.submit(task);
  23. }
  24. MultipleInvertedIndexTask invertedIndexTask=new
  25. MultipleInvertedIndexTask(completionService,invertedIndex);
  26. Thread thread1=new Thread(invertedIndexTask);
  27. thread1.start();
  28. MultipleInvertedIndexTask invertedIndexTask2=new
  29. MultipleInvertedIndexTask (completionService,invertedIndex);
  30. Thread thread2=new Thread(invertedIndexTask2);
  31. thread2.start();

5.3.5 对比解决方案

让我们对比一下前面已经实现的该例三个版本的解决方案。正如前面提到的,在文档集合方面,我们选取了有关电影信息的维基百科页面构建了一个含有100 673个文档的文档集合。我们将每个维基百科页面转换为一个文本文件。你可以下载该文档集合以及所有有关本书的信息。

我们执行了五个版本的解决方案。

  • 串行版本。
  • 一个任务处理一个文档的并发版本。
  • 一个任务处理多个文档的并发版本,分为每个任务分别处理100、1000和5000个文档的情况。

我们采用JMH框架(请查看名为“Code Tools: jmh”的文章)执行这些示例,该框架允许你用Java实现微型基准测试。使用一个面向基准测试的框架是比较好的解决方案,它直接用currentTimeMillis()方法或者nanoTime()方法度量时间。我们在两种不同的架构上分别执行这些示例10次。

  • 一台计算机配置了Core i5-5300处理器、Windows 7操作系统和16GB的RAM。该处理器有两个核,且每个核可以执行两个线程,这样我们就有四个并行线程。
  • 另一台计算机配置了AMD A8-640处理器、Windows 10操作系统和8GB的RAM。该处理器有四个核。

下表给出了五个版本的执行时间。

算法Intel架构AMD架构
执行时间(毫秒)执行时间(毫秒)
串行版本29 305.63137 519.75
并发版本:一个任务处理一个文档13 704.1775 593.93
并发版本:一个任务处理100个文档26 579.30195 928.209
并发版本:一个任务处理1000个文档25 126.47133 080.655
并发版本:一个任务处理5000个文档23 454.38118 789.394

我们可以得出下面结论。

  • 并发版本总是比串行版本性能好。
  • 对于并发版本而言,如果增加每个任务所处理的文档数量,将获得更好的结果。

在本例中,两种架构上的执行结果有较大差异,但是其他因素,例如硬盘、内存空间和处理速度等,实际上对本例的结果有较大的影响,因为本例读取的文件超过100 000份,会频繁使用内存。

如果我们使用加速比来比较串行版本和并发版本的处理速度,就会得到如下结果。

\begin{aligned}&S_{{\rm AMD}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{137~519.75}{75~5.93.93}=1.82\\&S_{{\rm Intel}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{29~305.63}{13~704.17}=2.13\end{aligned}

5.3.6 其他相关方法

本章,我们用AbstractExecutorService接口(在ThreadPoolExecutor类中实现)和CompletionService接口(在ExecutorCompletionService中实现)的一些方法来管理Callable任务的结果。然而,在此我们还想提及我们曾用过的这些方法的其他版本以及其他一些方法。

关于AbstractExecutorService接口,我们介绍下述方法。

  • invokeAll (Collection> tasks, long timeout, TimeUnit unit):当作为参数传递的Callable任务列表中的所有任务完成执行,或者执行时间超出了第二、第三个参数指定的时间范围时,该方法返回一个与该Callable任务列表相关联的Future对象列表。
  • invokeAny (Collection> tasks, long timeout, TimeUnit unit):当作为参数传递的Callable任务列表中的任务在超时(由第二和第三个参数指定的期限)之前完成其执行并且没有抛出异常时,该方法返回Callable任务列表中第一个任务的结果。如果超时,那么该方法抛出一个TimeoutException异常。

关于CompletionService接口,我们介绍下述方法。

  • poll()方法:我们用到了该方法带有两个参数的版本,不过该方法还有一个不带参数的版本。从内部数据结构来看,该版本检索并且删除自上一次调用poll()take()方法以来下一个已完成任务的Future对象。如果没有任何任务完成,执行该方法将返回null值。
  • take()方法:该方法和前一个方法类似,只不过如果没有任何任务完成,它将休眠该线程,直到有一个任务执行完毕为止。