8.3 第二个例子:信息检索工具

根据维基百科,信息检索的定义如下。

“从信息资源集合中获取与某一信息需求相关的信息资源。”

通常,信息资源是一个文档集合,而信息需求则是一个概述了需求的单词集合。为了快速搜索文档集合,我们采用一种名为倒排索引的数据结构。该结构存放了文档集合中的所有单词,而且对于每个单词,都有一个包含该单词文档的列表。在第5章中我们已经构建了一个文档集合的倒排索引,该文档集合包含有100 673个有关电影信息的维基百科页面。我们已将每个维基百科页面转换成一个文本文件。该倒排索引存放在一个文本文件中,且该文件的每一行都包含单词、单词在文档中出现的频率、所有出现了该单词的文档以及在该文档中的tfxidf属性。这些文档都按照tfxidf属性的值进行排序。例如,该文件中的一行如下所示:

  1. velankanni:4,18005302.txt:10.13,20681361.txt:10.13,45672176.txt:10
  2. 13,6592085.txt:10.13

这一行包含了单词velankanni,它的DF值为4。它在文档18005302.txt中出现且tfxidf值为10.13,在20681361.txt文档中出现且tfxidf值为10.13,在45672176.txt文档中出现且tfxidf值为10.13,在6592085.txt文档中出现且tfxidf值也为10.13。

本章将使用流API来实现不同版本的搜索工具,并且获取有关倒排索引的信息。

8.3.1 约简操作简介

正如本章前面提到的,约简操作将汇总操作应用于流的元素以生成一个单独的汇总结果。该结果可以与流的元素类型相同,也可以不同。计算一个数值流的和就是reduce()操作的一个简单示例。

流API提供了reduce()方法来实现约简操作。该方法有下述三个版本。

  • reduce(accumulator):该版本将accumulator函数应用于流的所有元素。在这种情况下没有初始值。它返回一个含有accumulator函数最终结果的Optional对象,或者当该流为空时返回一个空的Optional对象。accumulator函数必须是一个associative函数,它实现了BinaryOperator接口。两个参数既可以是流元素,也可以是之前调用accumulator函数所返回的部分结果。
  • reduce(identity,accumulator):当最终结果和流的元素类型相同时,必须采用该版本。标识值必须为accumulator函数的标识值。也就是说,如果将accumulator函数应用于标识值和任意值V,必须返回同样的值Vaccumulator(identity,V)=V。该标识值用作accumulator函数的第一个结果,如果流没有元素,则该值作为返回值。正如在另一版本中一样,accumulator必须是一个实现BinaryOperator接口的associative函数。
  • reduce(identity, accumulator, combiner):当最终结果与流的元素为不同类型时,必须使用该版本。标识值必须是combiner函数的标识。也就是说,combiner(identity,v)=v。这里的combiner函数必须与accumulator函数兼容,即combiner(u,accumulator (identity,v))=accumulator(u,v)accumulator函数采用局部结果和流的下一个元素生成另一个局部结果。combiner函数采用两个局部结果来生成另一个局部结果。这两个函数必须均是associative函数,但是在这种情况下,accumulator函数是BiFunction接口的实现,而combiner函数是BinaryOperator接口的实现。

reduce()方法有一个局限。如前所述,该函数必须返回单个值。你不应该使用reduce()方法来生成一个Collection对象或者一个复杂对象。首要问题在于性能。正如流API的文档中所说明的,accumulator函数每处理一个元素都会返回一个新值。如果你的accumulator函数处理的是集合,那么每当它处理一个元素时都会创建一个新的集合,这样效率就很低。另一个问题是,如果采用并行流,那么所有的线程都要共享标识值。

如果该值是一个可变对象,例如一个Collection,那么所有的线程都将作用于相同的Collection之上。这样就有悖于reduce()操作的初衷了。此外,combiner()方法总是接收两个相同的Collection(所有的线程仅作用于一个Collection之上)作为参数,这也有悖于reduce()操作的初衷。

如果要实现一个可生成Collection或复杂对象的约简操作,有如下两个供选方案。

  • 使用collect()方法应用可变约简操作。第9章将详细介绍如何在不同的场景下使用该方法。
  • 创建集合并且使用forEach()方法,以便使用所需值填充Collection

本例将使用reduce()方法来获取有关倒排索引的信息,使用forEach()方法将该索引约简成与某一查询相关的文档列表。

8.3.2 第一种方式:全文档查询

在第一种方式中,将用到与某一单词相关的所有文档。该搜索过程的实现步骤如下。

  • 在倒排索引中选取与查询中单词相对应的行。
  • 将所有的文档列表组合成单个列表。如果一个文档与两个或者多个单词相关,那么将在该文档中出现的这些单词的tfxidf值相加,得到该文档最终的tfxidf值。如果一个文档仅与一个单词相关,那么该单词的tfxidf值就是该文档的最终tfxidf值。
  • 使用文档的tfxidf值自高到低进行排序。
  • tfxidf值排名前100的文档展现给用户。

这一版本已经在ConcurrentSearch类的basicSearch()方法中实现。该方法的源代码如下所示:

  1. public static void basicSearch(String query[]) throws IOException {
  2. Path path = Paths.get("index", "invertedIndex.txt");
  3. HashSet<String> set = new HashSet<>(Arrays.asList(query));
  4. QueryResult results = new QueryResult(new ConcurrentHashMap<>());
  5. try (Stream<String> invertedIndex = Files.lines(path)) {
  6. invertedIndex.parallel().filter(line -> set
  7. .contains(Utils.getWord(line)))
  8. .flatMap(ConcurrentSearch::basicMapper)
  9. .forEach(results::append);
  10. results.getAsList().stream().sorted().limit(100)
  11. .forEach(System.out::println);
  12. System.out.println("Basic Search Ok");
  13. }
  14. }

我们接收一个含有查询单词的字符串对象数组。首先,将该数组转换成一个集合。然后,使用一个try-with-resources流处理invertedIndex.txt文件的各行,正是该文件存放了倒排索引。由于采用了try-with-resources流,因此不需要担心文件的打开和关闭。对该流的聚合操作将会生成一个含有相关文档的QueryResult对象。可以使用下述方法获取该列表。

  • parallel():首先,获取一个并行流以提高搜索过程的性能。
  • filter():选取将集合中单词与查询中单词相关联的行。Utils.getWord()方法将获取该行的单词。
  • flatMap():将字符串流(其中每个字符串都是倒排索引中的一行)转换成一个Token对象流。每个Token对象包含了文件中一个单词的tfxidf值。对于每一行,生成的Token对象数与包含该单词的文件数相同。
  • forEach():使用该类的add()方法添加每个Token对象,进而生成QueryResult对象。

一旦创建了QueryResult对象,则要使用以下方法创建另一个流,以便获得最终结果列表。

  • getAsList()QueryResult对象返回一个含有相关文档的列表。
  • stream():用于创建一个处理该列表的流。
  • sorted():用于按照文档的tfxidf值排列文档列表。
  • limit():用于获得前100个结果。
  • forEach():用于处理100个结果并且将信息输出到屏幕。

下面详细介绍一下在本例中用到的辅助类和方法。

  • basicMapper()方法

该方法将一个字符串流转换成一个Token对象流。稍后将详细介绍,Token中存放文档中一个单词的tfxidf值。该方法接收一个字符串(倒排索引中的一行)。它将一行分割成若干个Token,并且生成与单词所在文档数相同的Token对象。该方法在ConcurrentSearch类中实现,其源代码如下:

  1. public static Stream<Token> basicMapper(String input) {
  2. ConcurrentLinkedDeque<Token> list = new ConcurrentLinkedDeque();
  3. String word = Utils.getWord(input);
  4. Arrays.stream(input.split(",")).skip(1).parallel()
  5. .forEach(token -> list.add(new Token(word, token)));
  6. return list.stream();
  7. }

首先,创建一个ConcurrentLinkedDeque对象来存储Token对象。然后,使用split()方法分割字符串,并且使用Arrays类的stream()方法生成流。跳过第一个元素(其中包含单词的信息),并且以并行方式处理剩下的Token。对于每个元素,均创建一个新的Token对象(将该单词和具有file:tfxidf格式的Token传递给构造函数),并且将其添加到该流。最后,使用ConcurrenLinkedDeque对象的stream()方法返回一个流。

  • Token

如前所述,该类存储了文档中某一单词的tfxidf值。这样,该类就有三个属性用于存放这些信息,如下所示:

  1. public class Token {
  2. private final String word;
  3. private final double tfxidf;
  4. private final String file;

构造函数接收两个字符串。第一个参数中含有该单词,而第二个参数含有文件和以file:tfxidf格式出现的tfxidf属性,所以要按照如下代码进行处理。

  1. public Token(String word, String token) {
  2. this.word=word;
  3. String[] parts=token.split(":");
  4. this.file=parts[0];
  5. this.tfxidf=Double.parseDouble(parts[1]);
  6. }

最后,增加了获取(而不是设置)这三个属性值的方法,以及一个将对象转换成字符串的方法,如下所示:

  1. @Override
  2. public String toString() {
  3. return word+":"+file+":"+tfxidf;
  4. }
  • QueryResult

该类存放了与某个查询相关的文档列表。从内部来看,该类使用一个Map来存放相关文档信息。其键为文档的文件名,其值为一个Document对象,其中包含了文件名和该文档相对于该查询的tfxidf值总和,如下所示:

  1. public class QueryResult {
  2. private Map<String, Document> results;

使用该类的构造函数具体实现将用到的Map接口。在并发版本中使用ConcurrentHashMap,在串行版本中使用HashMap

  1. public QueryResult(Map<String, Document> results) {
  2. this.results=results;
  3. }

该类包含了append方法,它将一个Token插入Map,如下所示:

  1. public void append(Token token) {
  2. results.computeIfAbsent(token.getFile(), s -> new
  3. Document(s)).addTfxidf(token.getTfxidf());
  4. }

如果没有与文件相关的Document对象,那么使用computeIfAbsent()方法创建一个新的Document对象;如果Document对象早已存在,该方法会获取相应的Document对象,并且使用addTfxidf()方法将Token的tfxidf值加到文档的总tfxidf值。

最后,还引入了一个方法以获取Map,作为一个列表,如下所示:

  1. public List<Document> getAsList() {
  2. return new ArrayList<>(results.values());
  3. }

Document类将文件名以字符串形式保存,将总tfxidf值以DoubleAdder形式保存。该类是Java 8的一个新特性,可以从不同线程汇总计算变量的值,而无须担心同步问题。它实现了Comparable接口来按照文档的tfxidf值对其进行排序,这样tfxidf值最高的文档就会排到第一位。其源代码非常简单,此处不再给出。

8.3.3 第二种方式:约简的文档查询

第一种方法是为每个单词和文件创建一个新的Token对象。注意,有一些常见词(例如the)会关联大量的文档,但是其中的大多数tfxidf值都很低。我们修改了自己的映射器方法,对于每个单词仅考虑与之相关的100个文件,这样生成的Token对象数量就比较小了。

我们在ConcurrentSearch类的reducedSearch()方法中实现了该版本。该方法与basicSearch()方法非常类似,它仅仅改变了生成QueryResult对象的流操作,如下所示:

  1. invertedIndex.parallel().filter(line -> set
  2. .contains(Utils.getWord(line)))
  3. .flatMap(ConcurrentSearch::limitedMapper)
  4. .forEach(results::append);

现在,使用limitedMapper()方法作为flatMap()方法中的函数。

limitedMapper()方法

该方法与basicMapper()方法类似,但是如前所述,仅考虑与每个单词相关的前100个文档。因为文档均按照其tifxidf值进行排序,所以采用该词重要程度较高的前100个文档,如下所示:

  1. public static Stream<Token> limitedMapper(String input) {
  2. ConcurrentLinkedDeque<Token> list = new ConcurrentLinkedDeque();
  3. String word = Utils.getWord(input);
  4. Arrays.stream(input.split(",")).skip(1).limit(100).parallel().forEach(token
  5. -> {
  6. list.add(new Token(word, token));
  7. });
  8. return list.stream();
  9. }

它和basicMapper()方法唯一的区别在于对limit(100)的调用,这将选取流的前100个元素。

8.3.4 第三种方式:生成一个含有结果的HTML文件

使用Web搜索引擎(例如Google)作为搜索工具进行搜索时,它会返回搜索的结果(最重要的10个结果),而且每个结果都显示了文档的标题和出现所搜索单词的文档片段。

搜索工具的第三种方法基于第二种方法,只是增加了第三个流来生成一个含有搜索结果的HTML文件。对于每个结果,我们将显示文档的标题以及含有查询中单词的三行片段。要实现这一目标,需要访问在倒排索引中出现的文件。这些文件已经存储在一个名为docs的文件夹中。

第三种方法在ConcurrentSearch类的htmlSearch()方法中实现。该方法的第一部分与reducedSearch()方法相同,它构造了含有100个结果的QueryResult对象。

  1. public static void htmlSearch(String query[], String fileName) throws
  2. IOException {
  3. Path path = Paths.get("index", "invertedIndex.txt");
  4. HashSet<String> set = new HashSet<>(Arrays.asList(query));
  5. QueryResult results = new QueryResult(new ConcurrentHashMap<>());
  6. try (Stream<String> invertedIndex = Files.lines(path)) {
  7. invertedIndex.parallel().filter(line -> set
  8. .contains(Utils.getWord(line)))
  9. .flatMap(ConcurrentSearch::limitedMapper)
  10. .forEach(results::append);

然后,创建文件并写入输出结果和HTML头。

  1. path = Paths.get("output", fileName + "_results.html");
  2. try (BufferedWriter fileWriter = Files.newBufferedWriter(path,
  3. StandardOpenOption.CREATE)) {
  4. fileWriter.write("<HTML>");
  5. fileWriter.write("<HEAD>");
  6. fileWriter.write("<TITLE>");
  7. fileWriter.write("Search Results with Streams");
  8. fileWriter.write("</TITLE>");
  9. fileWriter.write("</HEAD>");
  10. fileWriter.write("<BODY>");
  11. fileWriter.newLine();

然后,引入在HTML文件中生成结果的流。

  1. results.getAsList().stream().sorted().limit(100).map(new
  2. ContentMapper(query)).forEach(l -> {
  3. try {
  4. fileWriter.write(l);
  5. fileWriter.newLine();
  6. } catch (IOException e) {
  7. e.printStackTrace();
  8. }
  9. });
  10. fileWriter.write("</BODY>");
  11. fileWriter.write("</HTML>");
  12. }

我们用到了以下方法。

  • getAsList():用于获取与查询相关的文档列表。
  • stream():用于生成一个顺序流。无法并行化该流。如果试图这样做,最终文件中的结果则不会按照文档的tfxidf值排序。
  • sorted():用于按照tfxidf属性对结果排序。
  • map():使用ContentMapper类将每个结果对应的Result对象转换成为一个含有HTML代码的字符串,本章稍后将详细介绍该类。
  • forEach():将map()方法返回的String对象输出到文件。Stream对象的方法不能抛出校验异常,因此要使用一个try…catch代码块抛出异常。

下面详细介绍一下ContentMapper类。

ContentMapper

ContentMapper类是Function接口的一个实现,它将一个Result对象转换成一个HTML代码块,其中含有文档标题以及含有查询中一个或多个单词的三行文档片段。

该类使用一个内部属性来存放查询,而且实现了一个构造函数来初始化该属性,如下所示:

  1. public class ContentMapper implements Function<Document, String> {
  2. private String query[];
  3. public ContentMapper(String query[]) {
  4. this.query = query;
  5. }

该文档的标题存放在文件的第一行中。使用try-with-resources指令和File类的lines()方法,创建含有该文件各行内容的String对象流,并且使用findFirst()方法以字符串形式获取第一行。

  1. public String apply(Document d) {
  2. String result = "";
  3. try (Stream<String> content = Files.lines(Paths.get("docs",d
  4. .getDocumentName()))) {
  5. result = "<h2>" + d.getDocumentName() + ": " +
  6. content.findFirst().get() + ": " +
  7. d.getTfxidf() + "</h2>";
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. throw new UncheckedIOException(e);
  11. }

然后,采用一种类似结构,不过在本例中,我们使用filter()方法获取那些仅包含查询中一个或多个单词的行,使用limit()方法选取其中的三行。然后,使用map()方法为每个段落添加HTML标记(

),并使用reduce()方法完成含有选定行的HTML代码。

  1. try (Stream<String> content = Files.lines(Paths.get ("docs",
  2. d.getDocumentName()))) {
  3. result += content.filter(l -> Arrays.stream(query)
  4. .anyMatch (l.toLowerCase()::contains))
  5. .limit(3).map(l -> "<p>"+l+"</p>")
  6. .reduce("",String::concat);
  7. return result;
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. throw new UncheckedIOException(e);
  11. }
  12. }

8.3.5 第四种方式:预先载入倒排索引

并行执行时,前三种解决方案会存在问题。如前所述,并行流是由Java并发API中的公共Fork/Join池执行的。在第7章中,我们了解了不应该在任务中使用I/O操作来读取或写入数据。这是因为当一个线程阻塞了从(向)文件读取(写入)数据时,该框架就不再使用工作窃取算法。因此将一个文件作为流的来源时,实际上是将自己的并发方案置于不利境地。

这一问题的解决方案之一就是将数据读取到某种数据结构中,然后从该数据结构中创建流。显然,与其他方式相比,这种方式的执行时间要少一些,但是仍要比较串行版本和并发版本,看看并发版本是否如预期那样能够带来更好的性能。这种方式的缺陷在于需要将数据结构存放在内存中,而这需要消耗大量的内存。

第四种方式在ConcurrentSearch类的preloadSearch()方法中实现。该方法接收以一个字符串数组形式存放的查询和一个带有倒排索引数据的ConcurrentInvertedIndex类(稍后将了解该类的详细内容)的对象作为参数。这一版本的源代码如下:

  1. public static void preloadSearch(String[] query,
  2. ConcurrentInvertedIndex invertedIndex) {
  3. HashSet<String> set = new HashSet<>(Arrays.asList(query));
  4. QueryResult results = new QueryResult(new ConcurrentHashMap<>());
  5. invertedIndex.getIndex().parallelStream()
  6. .filter(token -> set.contains(token.getWord()))
  7. .forEach(results::append);
  8. results.getAsList().stream().sorted().limit(100)
  9. .forEach(document -> System.out.println(document));
  10. System.out.println("Preload Search Ok.");
  11. }

ConcurrentInvertedIndex类采用List来存放从文件中读取的Token对象。该类有两个方法来操作该元素列表,即get()set()方法。

与在其他方式中一样,我们使用两个流:第一个用于获取Result对象的ConcurrentLinkedDeque,其中含有整个结果列表;第二个用于在控制台输出结果。与其他版本相比,第二个流并没有改变,但是第一个流发生了变化。在该流中使用了下述方法。

  • getIndex():首先,获取Token对象列表。
  • parallelStream():其次,创建一个并行流来处理该列表的全部元素。
  • filter():选择与查询中单词相关的Token。
  • forEach():对Token列表进行处理,使用append()方法将它们添加到QueryResult对象中。

ConcurrentFileLoader

ConcurrentFileLoader类将含有倒排索引信息的invertedIndex.txt文件内容加载到内存。它提供了一个名为load()的静态方法,该方法接收存放倒排索引的文件路径作为参数,返回一个ConcurrentInvertedIndex对象。代码如下:

  1. public class ConcurrentFileLoader {
  2. public ConcurrentInvertedIndex load(Path path) throws IOException {
  3. ConcurrentInvertedIndex invertedIndex = new ConcurrentInvertedIndex();
  4. ConcurrentLinkedDeque<Token> results=new ConcurrentLinkedDeque<>();

使用try-with-resources结构打开文件并且创建一个流来处理所有行。

  1. try (Stream<String> fileStream = Files.lines(path)) {
  2. fileStream.parallel().flatMap(ConcurrentSearch::limitedMapper)
  3. .forEach(results::add);
  4. }
  5. invertedIndex.setIndex(new ArrayList<>(results));
  6. return invertedIndex;
  7. }
  8. }

在该流中使用了下述方法。

  • parallel():将该流转换成一个并行流。
  • flatMap():使用ConcurrentSearch类的limitedMapper()方法将行转换成一个Token对象流。
  • forEach():处理Token对象列表,使用add()方法将它们添加到ConcurrentLinkedDeque对象中。

最后,将ConcurrentLinkedDeque对象转换成ArrayList,并且在InvertedIndex对象中使用setIndex()方法对其进行设置。

8.3.6 第五种方式:使用我们的执行器

为了更加深入地理解本例,我们还将测试另一个并发版本。正如本章开头提到的,并行流使用Java 8引入的公共Fork/Join池。然而,我们可以借助一个技巧来使用自己的池。如果将自己的方法作为Fork/Join池的一个任务,那么该流的所有操作都会在同一Fork/Join池中执行。为测试该功能,我们在ConcurrentSearch类中增加了executorSearch()方法。该方法接收以字符串对象数组表示的查询作为参数,接收InvertedIndex对象和一个ForkJoinPool对象。该方法的源代码如下:

  1. public static void executorSearch(String[] query,
  2. ConcurrentInvertedIndex invertedIndex, ForkJoinPool pool) {
  3. HashSet<String> set = new HashSet<>(Arrays.asList(query));
  4. QueryResult results = new QueryResult(new ConcurrentHashMap<>());
  5. pool.submit(() -> {
  6. invertedIndex.getIndex().parallelStream()
  7. .filter(token -> set.contains(token.getWord()))
  8. .forEach(results::append);
  9. results.getAsList().stream().sorted().limit(100)
  10. .forEach(document -> System.out.println(document));
  11. }).join();
  12. System.out.println("Executor Search Ok.");
  13. }

执行该方法的内容,其中含有两个流。使用submit()方法将该方法作为Fork/Join池中的一个任务,并且使用join()方法等待其执行完毕。

8.3.7 从倒排索引获取数据:ConcurrentData

我们还实现了一些方法来获取有关倒排索引的信息,这用到了ConcurrentData类中的reduce()方法。

8.3.8 获取文件中的单词数

第一个方法用于计算文件中的单词数。正如在本章前面提到的,倒排索引存储了出现单词的文件。如果想知道文件中出现的单词,必须处理所有的倒排索引。我们实现了该方法的两个版本。第一个是在getWordsInFile1()方法中实现的。它接收文件名和InvertedIndex对象作为参数,如下所示:

  1. public static void getWordsInFile1(String fileName, ConcurrentInvertedIndex
  2. index) {
  3. long value = index.getIndex().parallelStream()
  4. .filter(token -> fileName
  5. .equals(token.getFile())).count();
  6. System.out.println("Words in File "+fileName+": "+value);
  7. }

本例使用getIndex()方法获取Token对象列表,并且使用parallelStream()方法创建一个并行流。然后,使用filter()方法筛选与该文件相关的Token。最后,使用count()方法计算与该文件相关的单词数。

我们还实现了该方法的另一个版本,使用reduce()方法替代count()方法,即getWordsInFile2()方法,如下所示:

  1. public static void getWordsInFile2(String fileName, ConcurrentInvertedIndex
  2. index) {
  3. long value = index.getIndex().parallelStream()
  4. .filter(token -> fileName.equals(token.getFile()))
  5. .mapToLong(token -> 1).reduce(0, Long::sum);
  6. System.out.println("Words in File "+fileName+": "+value);
  7. }

操作的起始顺序与前一个方法相同。获取含有文件中单词的Token对象流时,我们使用mapToInt()方法将该流转换成一个数值为1的流,然后使用reduce()方法将所有的数值1相加。

8.3.9 获取文件的平均tfxidf

我们实现了getAverageTfxidf()方法,该方法计算了文档集合中某个文件中单词的平均tfxidf值。在此使用reduce()方法来展示它是如何运行的。也可以使用其他方法获得更好的性能。

  1. public static void getAverageTfxidf(String fileName,
  2. ConcurrentInvertedIndex index) {
  3. long wordCounter = index.getIndex().parallelStream()
  4. .filter(token -> fileName.equals(token.getFile()))
  5. .mapToLong(token -> 1).reduce(0, Long::sum);
  6. double tfxidf = index.getIndex().parallelStream()
  7. .filter(token -> fileName.equals(token.getFile()))
  8. .reduce(0d,(n,t)-> n+t.getTfxidf(),(n1,n2) -> n1+n2);
  9. System.out.println("Words in File "+fileName+": "+
  10. (tfxidf/wordCounter));
  11. }

我们使用两个流。第一个计算文件中的单词数,而且它和getWordsInFile2()方法的源代码相同。第二个计算文件中所有单词的tfxidf总值。我们使用同样的方法获取含有文件中单词的Token对象流,然后使用reduce方法将所有单词的tfxidf值相加。我们向reduce()方法传递下述三个参数。

  • O:该参数作为标识值传入。
  • (n,t) -> n+t.getTfxidf():该参数作为accumulator函数传入。它接收一个double数值和一个Token对象,并且计算该数值和Tokentfxidf属性值的和。
  • (n1,n2) -> n1+n2:该参数作为combiner函数传入。它接收两个数值并且计算它们的和。

8.3.10 获取索引中的最大tfxidf值和最小tfxidf

我们还在maxTfxidf()方法和minTfxidf()方法中使用reduce()方法来计算最大tfxidf值和最小tfxidf值。

  1. public static void maxTfxidf(ConcurrentInvertedIndex index) {
  2. Token token = index.getIndex().parallelStream()
  3. .reduce(new Token("", "xxx:0"), (t1, t2) -> {
  4. if (t1.getTfxidf()>t2.getTfxidf()) {
  5. return t1;
  6. } else {
  7. return t2;
  8. }
  9. });
  10. System.out.println(token.toString());
  11. }

该方法接收ConcurrentInvertedIndex作为参数。我们使用getIndex()方法来获取Token对象列表。然后,使用parallelStream()方法在该列表上创建一个并行流,并且使用reduce()方法获取具有最高tfxidf值的Token对象。在本例中,使用带有两个参数的reduce()方法,其中一个参数为标识值,另一个为一个accumulator函数。该标识值是一个Token对象。我们并不考虑该单词及其文件名称,但是将其tfxidf属性的值初始化为0。然后,accumulator函数接收两个Token对象作为参数。比较两个对象的tfxidf值属性,并且返回值较大的那个对象。

minTfxidf()方法非常类似,如下所示:

  1. public static void minTfxidf(ConcurrentInvertedIndex index) {
  2. Token token = index.getIndex().parallelStream()
  3. .reduce(new Token("", "xxx:1000000"),(t1, t2) -> {
  4. if (t1.getTfxidf()<t2.getTfxidf()) {
  5. return t1;
  6. } else {
  7. return t2;
  8. }
  9. });
  10. System.out.println(token.toString());
  11. }

对于本例,其主要区别在于对标识值的初始化要采用非常高的tfxidf属性值。

8.3.11 ConcurrentMain

为了测试在以上各节中讲述的方法,我们实现了ConcurrentMain类,该类实现了main()方法以启动测试。在这些测试中,使用了下面三个查询。

  • 查询1:含有jamesbond两个单词。
  • 查询2:含有gonewiththewind等单词。
  • 查询3:含有单词rocky

我们用三个版本的搜索过程测试上述三个查询,度量每次测试的执行时间。所有的测试都含有类似下面的代码:

  1. public class ConcurrentMain {
  2. public static void main(String[] args) {
  3. String query1[]={"james","bond"};
  4. String query2[]={"gone","with","the","wind"};
  5. String query3[]={"rocky"};
  6. Date start, end;
  7. bufferResults.append("Version 1, query 1, concurrent\n");
  8. start = new Date();
  9. ConcurrentSearch.basicSearch(query1);
  10. end = new Date();
  11. bufferResults.append("Execution Time: " + (end.getTime() -
  12. start.getTime()) + "\n");

为从某个文件将倒排索引加载到一个InvertedIndex对象,可以使用下述代码。

  1. ConcurrentInvertedIndex invertedIndex = new
  2. ConcurrentInvertedIndex();
  3. ConcurrentFileLoader loader = new ConcurrentFileLoader();
  4. invertedIndex = loader.load(Paths.get("index",
  5. "invertedIndex.txt"));

为了创建用于executorSearch()方法的执行器,可以使用下面的代码。

  1. ForkJoinPool pool = new ForkJoinPool();

8.3.12 串行版

我们通过SerialSearchSerialDataSerialInvertendIndexSerialFileLoaderSerialMain类实现了该例的串行版。为了实现该版本,我们做了如下改动。

  • 使用顺序流替代并行流。不使用parallel()方法来将流转换成并行流,或者将创建并行流的parallelStream()方法替换为stream()方法,进而创建一个顺序流。
  • SerialFileLoader类中,使用ArrayList代替ConcurrentLinkedDeque

8.3.13 对比两种解决方案

比较一下已实现所有方法的串行版和并行版解决方案。

使用JMH框架来执行它们,该框架允许你在Java中实现微型基准测试。使用一个面向基准测试的框架是比较好的解决方案,它直接用currentTimeMillis()方法或者nanoTime()方法度量时间。在两种不同的架构上分别执行这些示例10次。

  • 一台计算机配置了Intel Core i5-5300处理器、Windows 7操作系统和16GB的RAM。该处理器有两个核,且每个核可以执行两个线程,这样就有四个并行线程。
  • 另一台计算机配置了AMD A8-640处理器、Windows 10操作系统和8GB的RAM。该处理器有四个核。

对于含有单词jamesbond的第一个查询,其执行时间如下(单位:毫秒)。

Intel架构AMD架构
串行版并发版串行版并发版
基本搜索1310.845650.833286.3361732.431
约简搜索1179.955645.1843172.0251521.285
HTML搜索1457.035785.5533351.342089.5
预加载搜索84.17443.716152.663104.394
执行器搜索90.71447.865144.375111.829

对于带有单词gonewiththewind的第二个查询,其执行时间如下(单位:毫秒)。

Intel架构AMD架构
串行版并发版串行版并发版
基本搜索1425.664853.5433822.3221787.31
约简搜索1159.872644.4293236.0211540.008
HTML搜索1428.503807.9553358.6942330.248
预加载搜索75.80349.417161.131120.313
执行器搜索89.73744.969149.358109.485

对于含有单词rocky的第三个查询,执行时间如下(单位:毫秒)。

Intel架构AMD架构
串行版并发版串行版并发版
基本搜索1274.524706.9793163.4591446.918
约简搜索1165.619767.0273167.8871586.318
HTML搜索1167.504677.0013196.0332224.549
预加载搜索74.28745.014140.17101.741
执行器搜索81.92947.868142.389107.507

最后,下表为返回有关倒排索引信息的各方法的平均执行时间(单位:毫秒)。

Intel架构AMD架构
串行版并发版串行版并发版
getWordsInFile180.11237.111121.37979.084
getWordsInFile268.62730.371121.45275.397
getAverageTfxidf127.38262.966259.749145.967
maxTfxidf31.6428.20789.01376.604
minTfxidf40.25630.22891.78482.566

可以得出如下结论。

  • 读取倒排索引以获取相关文档列表时,算法的并发版本展现了更好的性能。
  • 采用预先载入倒排索引的版本时,该算法的并发版本也在各种情况下表现出了较好的性能。
  • 对于那些能够返回倒排索引相关信息的方法,算法的并发版本总是具有更好的性能。

最后使用加速比比较三个查询的并行流和顺序流处理情况,例如,对于预先载入倒排索引的James Bond查询,有如下公式。

\begin{aligned}&S_{{\rm AMD}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{152.663}{104.304}=1.46\\&S_{{\rm Intel}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{84.174}{43.716}=1.92\end{aligned}

最后,在第三种方法中,我们生成了含有查询结果的HTML网页。对于带有单词james bond的第一个查询,搜索到的前几个结果如下。

8.3 第二个例子:信息检索工具 - 图2

对于含有单词gone with the wind的第二个查询,其搜索到的前几个结果如下。

8.3 第二个例子:信息检索工具 - 图3

最后,查询rocky搜索到的前几个结果如下所示。

8.3 第二个例子:信息检索工具 - 图4