9.3 第二个例子:推荐系统

推荐系统基于用户曾经购买/使用过的商品/服务向其推荐商品或服务,或者基于曾经购买/使用过同样服务的用户所购买/使用过的商品/服务向其推荐商品或服务。

我们使用在上一节介绍过的例子实现了一个推荐系统。商品的每个描述包括很多用户对商品的评论。这些评论中还含有用户对该商品的评分。

在本例中,你将通过这些评论获得某个用户可能感兴趣的商品列表。我们将获得一个用户所购买商品的列表。为了得到该列表,需要对购买过这些商品的用户列表和那些用户所购买过的商品列表进行排序,而这就要用到评论中的平均打分。这样就可以得到针对该用户的建议商品。

9.3.1 公共类

我们在上一节使用的公共类中增加了两个新类。如下所示。

  • ProductReview:该类采用两个新属性扩展了Product类。
  • ProductRecommendation:该类存储了一个商品的推荐信息。

下面看看这两个类的详细信息。

  • ProductReview

ProductReview类扩展了Product类,它增加了两个新属性。

  • buyer:该属性存放了商品客户的名称。
  • value:该属性存放了该客户在其评论中对商品的评价。
    该类中包含了对这两个属性的定义、对应的getXXX()setXXX()方法、一个构造函数(基于Product对象创建ProductReview对象),以及新属性的值。该类非常简单,因此这里不提供其源代码。
  • ProductRecommendation

ProductRecommendation类存放了商品推荐所需的必要信息,包括如下内容。

  • title:我们要推荐的商品名称。
  • value:推荐的分值,这是通过计算商品所有评论的平均分值得到的。
    该类包含了属性定义、相应的getXXX()setXXX()方法,以及compareTo()方法的实现(该类实现了Comparable接口),通过compareTo()方法可以按照降序对推荐评分进行排序。该类非常简单,此处不提供其源码。

9.3.2 推荐系统:主类

我们在ConcurrentMainRecommendation类中实现了我们的算法,以获得针对某个客户的推荐商品列表。该类实现了main()方法,该方法接收要获取推荐商品的客户ID作为参数。我们有如下代码。

  1. public static void main(String[] args) {
  2. String user = args[0];
  3. Path file = Paths.get("data");
  4. try {
  5. Date start, end;
  6. start=new Date();

我们在最终解决方案中使用了不同的流来转换数据。第一个流从其文件中加载整个Product对象列表。

  1. List<Product> productList = Files.walk(file, FileVisitOption
  2. .FOLLOW_LINKS).parallel().filter(f-> f
  3. .toString().endsWith(".txt"))
  4. .collect(ArrayList<Product>::new, new
  5. ConcurrentLoaderAccumulator(),
  6. List::addAll);

该流有如下元素。

(1) 使用Files类的walk()方法启动该流。该方法将创建一个流来处理Data目录下的所有文件和目录。

(2) 然后,使用parallel()方法将该流转换成一个并发流。

(3) 之后,仅获取扩展名为.txt的文件。

(4) 最后,使用collect()方法获取Product对象的ConcurrentLinkedDeque类。该类和之前用到的类非常相似,不同之处是采用了另一个Accumulator。本例用到了ConcurrentLoaderAccumulator类,这将在稍后进行介绍。

一旦获取到商品列表,便准备用一个Map组织这些商品,将客户的标识作为该Map的键。使用ProductReview类来存放有关这些商品的客户信息。我们需要为每个商品评论创建一个ProductReview对象。使用下面的流完成该转换。

  1. Map<String, List<ProductReview>> productsByBuyer=
  2. productList.parallelStream()
  3. .<ProductReview>flatMap(p -> p.getReviews()
  4. .stream().map(r -> new ProductReview(p, r.getUser(),
  5. r.getValue()))).collect(Collectors
  6. .groupingByConcurrent( p -> p.getBuyer()));

该流具有下述元素。

(1) 采用productList对象的parallelStream()方法启动该流,这样就创建了一个并发流。

(2) 然后,使用flatMap()方法将现有的Product对象流转换成一个唯一的ProductReview对象流。

(3) 最后,使用collect()方法生成最后的Map。本例用到了由Collectors类的groupingByConcurrent()方法生成的预定义收集器。返回的收集器将生成一个Map,其键为购买者属性的取值,而其值为一个ProductReview对象列表,其中含有该用户所购买商品的信息。正如该方法的名称所示,该转换将以并发方式执行。

接下来是本例中最重要的一个流。选定某个客户购买的商品并且生成对该客户的推荐。这是由一个流完成的有两个阶段的过程。第一个阶段,获取购买了原客户所购买商品的用户。第二个阶段,生成一个Map,其中含有这些客户所购买的商品,以及这些客户针对商品所做的评论。该流的代码如下。

  1. Map<String,List<ProductReview>> recommendedProducts= productsByBuyer
  2. .get(user).parallelStream().map(p -> p
  3. .getReviews()).flatMap(Collection::stream)
  4. .map(r -> r.getUser()).distinct()
  5. .map(productsByBuyer::get)
  6. .flatMap(Collection::stream)
  7. .collect(Collectors.groupingByConcurrent
  8. (p -> p.getTitle()));

在该流中有如下元素。

(1) 首先,获取用户所购买商品的列表,并且使用parallelStream()方法来生成一个并发流。

(2) 然后,使用map()方法获取所有有关这些商品的评论。

(3) 此时,有了一个List流。将该流转换成一个Review对象流。现在,就有了一个含有用户所购买商品的全部评论的流。

(4) 然后,将该流转换成一个String对象流,其中含有提交这些评论的用户的名称。

(5) 然后,使用distinct()方获取唯一的用户名称。现在就有了一个String对象流,其中包含了那些与原用户购买了相同商品的用户名称。

(6) 然后,使用map()方法将每个客户与其已购商品列表对应起来。

(7) 此时,就有了一个List对象流。使用flatMap()方法将该流转换成一个ProductReview对象流。

(8) 最后,使用collect()方法和groupingByConcurrent()收集器生成一个商品Map。该Map的键是商品名称,而其值为ProductReview对象列表,该列表含有前面已获取到的客户评论。

为了完成该推荐算法,还需要最后一步。对于每个商品,都希望计算其在评论中的平均分值,并且按照降序对该列表进行排序,以便将排在前面的商品放在首要位置显示。为了进行这样的转换,要采用一个额外的流。

  1. ConcurrentLinkedDeque<ProductRecommendation> recommendations
  2. = recommendedProducts.entrySet().parallelStream()
  3. .map(entry -> new ProductRecommendation(entry
  4. .getKey(), entry.getValue().stream().mapToInt(p->
  5. p.getValue()).average().getAsDouble()))
  6. .sorted().collect(Collectors.toCollection
  7. (ConcurrentLinkedDeque::new));
  8. end=new Date();
  9. recommendations. forEach(pr -> System.out.println (pr.getTitle()
  10. +": "+pr.getValue()));
  11. System.out.println("Execution Time: "+(end.getTime()-
  12. start.getTime()));
  13. } catch (IOException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }

处理上一步得到的Map。对于每个商品,对其评论列表进行处理,生成一个ProductRecommendation对象。需要通过一个流来计算每个评论的平均值作为该对象的值,这就要使用mapToInt()方法将ProductReview对象转换成一个整数流,并且使用average()方法求取字符串中所有数值的平均值。

最后,在关于推荐的ConcurrentLinkedDeque类中,有一个ProductRecommendation对象列表。使用其他带有sorted()方法的流对该列表进行排序。使用该流将最终列表输出到控制台。

9.3.3 ConcurrentLoaderAccumulator

为了实现本例,使用了ConcurrentLoaderAccumulator类,它在collect()方法中用作Accumulator函数,将含有全部待处理文件路径的Path对象流转换为关于Product对象的ConcurrentLinkedDeque类。该类的源代码如下:

  1. public class ConcurrentLoaderAccumulator implements
  2. BiConsumer<List<Product>, Path> {
  3. @Override
  4. public void accept(List<Product> list, Path path) {
  5. Product product=ProductLoader.load(path);
  6. list.add(product);
  7. }
  8. }

上述源代码实现了BiConsumer接口。其中accept()方法使用ProducLoader类(在本章前面做过解释)从文件中加载商品信息,并且将作为结果的Product对象添加到以参数传递的List类。

9.3.4 串行版

正如本书的其他例子一样,本例也实现了一个串行版本,以检验并行流对应用程序性能的提升情况。为了实现该串行版本,要遵循下述步骤。

(1) 将ConcurrentLinkedDeque数据结构替换为ListArrayList数据结构。

(2) 将parallelStrem()方法更改为stream()方法。

(3) 将gropingByConcurrent()方法更改为groupingBy()方法。

可以在本书配套的源代码中查看本例的串行版。

9.3.5 对比两个版本

为了对比推荐系统的串行版和并发版,我们获取了三个用户的推荐商品。

  • A2JOYUS36FLG4Z
  • A2JW67OY8U6HHK
  • A2VE83MZF98ITY

我们采用JMH框架执行了这些示例,该框架允许在Java中实现微型基准测试。使用面向基准测试的框架是比较好的解决方案,它直接用currentTimeMillis()方法或者nanoTime()方法度量时间。在两种不同的架构上分别执行这些示例10次。

  • 一台计算机配置了Intel Core i5-5300处理器、Windows 7操作系统和16GB的RAM。该处理器有两个核,且每个核可以执行两个线程,这样就有四个并行线程。
  • 另一台计算机配置了AMD A8-640处理器、Windows 10操作系统和8GB的RAM。该处理器有四个核。

用毫秒表示的结果如下。

A2JOYUS36FLG4ZA2JW67OY8U6HHKA2VE83MZF98ITY
Intel架构
串行版1639.6851542.8041595.341
并发版1030.6351061.2471054.213
AMD架构
串行版3361.9563412.6803351.890
并发版1866.6531871.9191999.916

可以得出如下结论。

  • 针对这三个用户得到的结果非常相似。
  • 并发流的执行时间总是比顺序流的执行时间更优。

如果对比并发版本和串行版本,例如,对第二个用户的结果使用加速比,可得到如下结果。

\begin{aligned}&S_{{\rm AMD}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{3412.680}{1871.919}=1.82\\&S_{{\rm Intel}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{1542.804}{1061.247}=1.45\end{aligned}