9.2 第一个例子:无索引条件下的数据搜索

在第8章中,你学会了如何实现一个搜索工具,使用倒排索引查找与输入查询相似的文档。该数据结构使搜索操作更加方便和快捷,但是在有些场景下,你需要针对一个大规模数据集做搜索操作,而且并没有倒排索引帮忙。这时需要处理该数据集的所有元素以获得正确结果。在本例中,你将看到这样一个场景,并且看到Stream API的reduce()方法如何能帮助你。

为了实现该示例,将使用亚马逊联合采购网络元数据的数据子集,其中包含了亚马逊销售的约548 552个商品的相关信息,包括商品名称、销售排名、相似商品列表、类别和评论等。可以在SNAP搜索“Amazon product co-purchasing network metadata”下载该数据集。我们选取其中的前20 000个商品,并且将每个商品记录都存放到一个单独的文件中。为了便于数据处理,我信更改了其中某些字段的格式。所有字段都采用property:value格式。

9.2.1 基本类

有一些类是并发版本和串行版本共享的。在此详细介绍一下其中的每个类。

  • Product

Product类存放了有关商品的信息。下面给出了Product类。

  • id:这是商品的唯一标识符。
  • asin:这是亚马逊的标准身份识别码。
  • title:这是商品的名称。
  • group:这是商品的分组。该属性的取值可以为Baby ProductBookCDDVDMusicSoftwareSportsToyVideo或者Video Games
  • salesrank:这表示亚马逊公司的销售排名。
  • similar:这是文件中所包含的相似项的数目。
  • categories:这是一个String对象列表,其中含有指派给该商品的类别。
  • reviews:这是一个Review对象列表,其中含有该商品的评论(用户和评分)。
    该类仅包含属性定义以及与之对应的getXXX()方法和setXXX()方法,因此这里不再给出其源代码。
  • Review

如前所述,Product类含有一个Review对象列表,其中含有用户对商品的评论信息。该类用如下两个属性存放了每个评论的信息。

  • user:进行评论的用户的内部编码。
  • value:用户对商品的评分。
    该类仅包含属性定义以及对应的getXXX()setXXX()方法,因此不再给出源代码。
  • ProductLoader

ProductLoader类允许你从某个文件将有关某一商品的信息加载到Product对象。该类实现了load()方法,该方法接收一个Path对象(其中含有商品信息的文件路径),并且返回一个Product对象。其源代码如下:

  1. public class ProductLoader {
  2. public static Product load(Path path) {
  3. try (BufferedReader reader = Files.newBufferedReader(path)) {
  4. Product product=new Product();
  5. String line=reader.readLine();
  6. product.setId(line.split(":")[1]);
  7. line=reader.readLine();
  8. product.setAsin(line.split(":")[1]);
  9. line=reader.readLine();
  10. product.setTitle(line.substring (line.indexOf(':')+1));
  11. line=reader.readLine();
  12. product.setGroup(line.split(":")[1]);
  13. line=reader.readLine();
  14. product.setSalesrank(Long.parseLong (line.split(":")[1]));
  15. line=reader.readLine();
  16. product.setSimilar(line.split(":")[1]);
  17. line=reader.readLine();
  18. int numItems=Integer.parseInt(line.split(":")[1]);
  19. for (int i=0; i<numItems; i++) {
  20. line=reader.readLine();
  21. product.addCategory(line.split(":")[1]);
  22. }
  23. line=reader.readLine();
  24. numItems=Integer.parseInt(line.split(":")[1]);
  25. for (int i=0; i<numItems; i++) {
  26. line=reader.readLine();
  27. String tokens[]=line.split(":");
  28. Review review=new Review();
  29. review.setUser(tokens[1]);
  30. review.setValue(Short.parseShort(tokens[2]));
  31. product.addReview(review);
  32. }
  33. return product;
  34. } catch (IOException x) {
  35. throw newe UncheckedIOException(x);
  36. }
  37. }
  38. }

9.2.2 第一种方式:基本搜索

第一种方式是接收一个单词作为输入查询,搜索所有存储商品信息的文件,看看是否在定义商品的某个字段中含有该单词,不论对哪个商品都这样操作。这将仅显示包含该单词的文件名。

为了实现该基本方式,我们实现了ConcurrentMainBasicSearch类,它实现了main()方法。首先,初始化查询和存放所有文件的基本路径。

  1. public class ConcurrentMainBasicSearch {
  2. public static void main(String args[]) {
  3. String query = args[0];
  4. Path file = Paths.get("data");

我们只需要一个流来生成含有结果的字符串列表,如下所示:

  1. try {
  2. Date start, end;
  3. start = new Date();
  4. ConcurrentLinkedDeque<String> results = Files.walk(file,
  5. FileVisitOption.FOLLOW_LINKS).parallel().filter(f ->
  6. f.toString().endsWith(".txt"))
  7. .collect(ArrayList<String>::new,
  8. new ConcurrentStringAccumulator (query), List::addAll);
  9. end = new Date();

我们的流包含下述元素。

(1) 使用Files类的walk()方法启动流,将文件集合的基本Path对象作为参数传递。该方法将所有文件作为流返回,并且返回该路径下的所有目录。

(2) 然后,使用parallel()方法将该流转换成一个并发流。

(3) 我们仅对扩展名为.txt的文件感兴趣,因此使用filter()方法对文件进行筛选。

(4) 最后,使用collect()方法将Path对象流转换为String对象(含有文件名)的ConcurrentLinkedDeque

我们使用collect()方法的三参数版本用到了下述函数型参数。

  • Supplier:使用ArrayList类的new方法引用为每个线程创建一个新的数据结构,以便存放相应结果。
  • Accumulator:我们在ConcurrentStringAccumulator类中实现了自己的Accumulator。稍后将详细介绍该类。
  • Combiner:使用ConcurrentLinkedDeque类的addAll()方法连接两个数据结构。在本例中,会将第二个Collection中的所有元素添加到第一个Collection中。而第一个Collection既可用于进一步的合并,也可以作为最终结果。

最后,在控制台输出从流获得的结果。

  1. System.out.println("Results for Query: "+query);
  2. System.out.println("*************");
  3. results.forEach(System.out::println);
  4. System.out.println("Execution Time: "+(end.getTime()-
  5. start.getTime()));
  6. } catch (IOException e) {
  7. e.printStackTrace();
  8. }
  9. }
  10. }

每当要处理流的一个路径以评估是否必须将其名称包含到结果列表中时,都要执行Accumulator函数型参数。为实现这种功能,我们实现了ConcurrentStringAccumulator类。下面看看该类的详细情况。

ConcurrentStringAccumulator

ConcurrentStringAccumulator类加载了一个带有商品信息的文件,以判断它是否包含查询中的术语。它实现了BiConsumer接口,这是因为我们要将其用作collect()方法的一个参数。使用List类和Path类参数化该接口。

  1. public class ConcurrentStringAccumulator implements BiConsumer
  2. <List<String>, Path> {

它将查询定义为一个内部属性,并该属性在构造函数中被初始化,如下所示。

  1. private String word;
  2. public ConcurrentStringAccumulator (String word) {
  3. this.word=word.toLowerCase();
  4. }

然后,实现在BiConsumer接口中定义的accept()方法。该方法接收两个参数:一个是ConcurrentLinkedDeque类,另一个是Path类。

为了加载文件并且判断它是否包含该查询,使用以下的流。

  1. @Override
  2. public void accept(List<String> list, Path path) {
  3. long counter;
  4. try {
  5. counter = Files.lines(path).map(l -> l.split(":")[1].toLowerCase())
  6. .filter(l -> l.contains(word.toLowerCase())).count();

我们的流包含下述元素。

(1) 首先,使用Files类的lines()方法加载文件中的行到一个流。文件每一行均参照property: value格式。

(2) 其次,使用map()方法获取每个属性的取值。

(3) 然后,使用filter()方法仅选取那些含有待搜索单词的行。

(4) 最后,使用count()方法计算流中剩下的元素数。

如果Counter变量的值大于0,那么该文件包含查询术语,如此便将该文件的名称添加到存放结果的ConcurrentLinkedDeque类。

  1. if (counter>0) {
  2. list.add(path.toString());
  3. }
  4. } catch (Exception e) {
  5. System.out.println(path);
  6. e.printStackTrace();
  7. }
  8. }
  9. }

9.2.3 第二种方式:高级搜索

基本搜索方式存在一些缺陷。

  • 该方式在所有属性中查找查询中的术语,但是或许我们只想对其中的一部分进行查找,例如在商品名称中查找。
  • 该方式仅显示文件名,但是其实还可以显示更多的信息,例如也可以显示商品名称这样的附加信息。

为了解决这些问题,我们将构造一个实现main()方法的ConcurrentMainSearch类。首先,初始化查询和存放所有文件的基础Path对象。

  1. public class ConcurrentMainSearch {
  2. public static void main(String args[]) {
  3. String query = args[0];
  4. Path file = Paths.get("data");

然后,使用下面的流来生成有关Product对象的ConcurrentLinkedDeque类。

  1. try {
  2. Date start, end;
  3. start=new Date();
  4. List<Product> results = Files.walk(file, FileVisitOption
  5. .FOLLOW_LINKS).parallel().filter(f -> f
  6. .toString().endsWith(".txt"))
  7. .collect(ArrayList<Product>::new, new
  8. ConcurrentObjectAccumulator(query),
  9. List::addAll);

这个流和在基本方式中实现的流具有相同的元素,只是有下述两点变化。

  • collect()方法中,在Accumulator参数中使用了ConcurrentObjectAccumulator类。
  • 使用Product对象参数化ConcurrentLinkedDeque类。

最后,在控制台中输出结果。但是在本例中,我们输出每个商品的名称。

  1. System.out.println("Results");
  2. System.out.println("*************");
  3. results.forEach(p -> System.out.println(p.getTitle()));
  4. System.out.println("Execution Time: "+(end.getTime()-
  5. start.getTime()));
  6. } catch (IOException e) {
  7. e.printStackTrace();
  8. }
  9. }
  10. }

你可以更改上述代码,输出有关商品的其他任何信息,例如销售排名或者类别。

与之前相比,这一实现最重要的变化在于ConcurrentObjectAccumulator类。下面详细介绍一下该类。

ConcurrentObjectAccumulator

ConcurrentObjectAccumulator类实现了BiConsumer接口,该接口由ConcurrentLinkedDeque类和Path类参数化,这是因为我们希望在collect()方法中使用它。该类定义了名为word的内部属性来存放查询中的术语。该属性在该类的构造函数中初始化。

  1. public class ConcurrentObjectAccumulator implements BiConsumer
  2. <List<Product>, Path> {
  3. private String word;
  4. public ConcurrentObjectAccumulator(String word) {
  5. this.word = word;
  6. }

accept()方法(在BiConsumer接口中定义)的实现非常简单。

  1. @Override
  2. public void accept(List<Product> list, Path path) {
  3. Product product=ProductLoader.load(path);
  4. if (product.getTitle().toLowerCase().contains(word.toLowerCase())){
  5. list.add(product);
  6. }
  7. }
  8. }

该方法接收指向待处理文件的Path对象作为参数,并采用ConcurrentLinkedDeque类存放结果。我们使用ProductLoader类将待处理文件加载到Product对象,然后检查该商品的名称中是否包含查询中的术语。如果包含,那么将该Product对象添加到ConcurrentLinkedDeque类。

9.2.4 本例的串行实现

与本书的其他例子一样,两个版本的搜索操作都实现了一个串行版本,以便验证并发流是否能带来性能上的改进。

你可以在前面介绍的四个类中删除Stream对象中parallel()方法的调用(该方法使流变为并发流),实现与之对等的串行版本。

在本书的源代码中,我们给出了SerialMainBasicSearchSerialMainSearchSerialStringAccumulatorSerialObjectAccumulator等类,它们都是按照前面的更改方法得到的与并行版对等的串行版类。

9.2.5 对比实现方案

我们对实现方案(两种方案:串行版和并发版)进行了测试,以比较其执行时间。为进行测试,采用了三种查询。

  • Patterns
  • Java
  • Tree

我们采用JMH框架执行了这些示例,该框架允许在Java中实现微型基准测试。使用面向基准测试的框架是比较好的解决方案,它直接用currentTimeMillis()nanoTime()等方法度量时间。在两种不同的架构上分别执行这些示例10次。

  • 一台计算机配置了Intel Core i5-5300处理器、Windows 7操作系统和16GB的RAM。该处理器有两个核,且每个核可以执行两个线程,这样就有四个并行线程。
  • 另一台计算机配置了AMD A8-640处理器、Windows 10操作系统和8GB的RAM。该处理器有四个核。

下表给出了用毫秒表示的结果。首先,展示字符串搜索操作的结果。

字符串搜索
Intel架构AMD架构
JavaPatternsTreeJavaPatternsTree
串行版735.569709.484700.9292245.6032243.1522207.034
并发版401.276524.252395.0221058.7121045.2011057.155

现在,对象搜索操作的结果如下。

字符串搜索
Intel架构AMD架构
JavaPatternsTreeJavaPatternsTree
串行版867.534840.082854.2992723.5352634.6142640.329
并发版460.29463.201476.2441218.4251232.451204.245

可以得到如下结论。

  • 执行不同的查询时,结果非常相似。它们之间仅相差数毫秒。
  • 字符串搜索的执行时间总是比对象搜索的执行时间更优。
  • 在所有情况下,并发流的性能都比串行流更好。

如果对并发版和串行版加以比较,例如,使用加速比比较查询Patterns的字符串搜索情况,可以得到下面的结果。

\begin{aligned}&S_{{\rm AMD}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{2243.152}{1045.201}=2.15\\&S_{{\rm Intel}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{709.484}{524.252}=1.35\end{aligned}