8.2 第一个例子:数值综合分析应用程序

拥有一个大规模数据集时,最常见的需求之一就是对其元素进行处理,以计算某些特征的指标。例如,如果你有一个商店的已售产品集合,可以计算已售产品的数量、每种产品的销量,或者每个客户对每种产品的平均购买量。我们将这个过程称作数值综合分析

本章将使用流来计算UCI机器学习资源库的Online Retail数据集的一些指标。该数据集存储了2010年1月12日到2011年9月12日期间英国一家在线零售商店的交易数据。

与其他各章不同,本例先介绍使用流的并发版本程序,然后介绍如何实现一个与之相当的串行版程序,以验证并发性也使用流提升了性能。正如在本章开头提到的,要注意并发处理对于编程人员来说是透明的。

8.2.1 并发版本

数值综合分析应用程序非常简单,其组成部分如下所示。

  • Record:该类定义了文件中每条记录的内部结构。它定义了每条记录的8个属性以及用于获取和设定这些属性值的get()set()方法。该类的代码非常简单,因此在本书中并未给出。
  • ConcurrentDataLoader:该类用于加载含有数据的Online_Retail.csv文件,并且将其转换成一个Record对象列表。我们将使用流来加载数据并完成转换。
  • ConcurrentStatistics:该类实现了用于数据计算的各项操作。
  • ConcurrentMain:该类实现了main()方法,来调用ConcurrentStatistics类的各项操作并且测量其执行时间。

下面详细介绍一下其中后三个类。

  • ConcurrentDataLoader

ConcurrentDataLoader类实现了load()方法,该方法将加载带有Online Retail数据集的文件并且将其转换成一个Record对象列表。首先,使用Files类的readAllLines()方法加载该文件,并且将其内容转换为一个字符串列表。该文件的每一行都将被转换为该列表的一个元素。

  1. public class ConcurrentDataLoader {
  2. public static List<Record> load(Path path) throws IOException {
  3. System.out.println("Loading data");
  4. List<String> lines = Files.readAllLines(path);

然后,通过对该流应用必要的操作以得到Record对象列表。

  1. List<Record> records = lines.parallelStream()
  2. .skip(1).map(l -> l.split(";"))
  3. .map(t -> new Record(t))
  4. .collect(Collectors.toList());

在这里用到的操作有如下几项。

  • parallelStream():创建一个并行流来处理该文件的所有行。
  • skip(1):忽略该流的第一项;在本例中,即文件的第一行,其中包含了文件的头信息。
  • map (1 → 1.split(";")):对String[]数组中的各个字符串进行转换,用;字符分割各行。使用lambda表达式,其中1代表输入参数,而1.split()将生成关于这些字符串的数组。在一个字符串的流中调用该方法,将生成一个String[]流。
  • map(t → new Record(t)):使用Record类的构造函数将每个字符串数组转换成一个Record对象。使用一个lambda表达式,其中t代表字符串数组。在一个关于String[]的流中调用该方法,生成一个Record对象流。
  • collect(Collectors.toList()):该方法将流转换成一个列表。第9章会更详细地介绍collect()方法。
    如你所见,我们以一种紧凑、优雅且并发的方式完成了转换,而且并没有用到任何线程、任务或者框架。最后,返回Record对象列表,如下所示:
  1. return records;
  2. }
  3. }
  • ConcurrentStatistics

ConcurrentStatistics类实现了对数据进行微积分计算的各种方法。有七种操作可用于获得有关数据集的信息,下面分别进行介绍。

  • 来自英国的客户

该方法的主要目的是获得每位英国客户订购的产品数量。

该方法的源代码如下:

  1. public static void customersFromUnitedKingdom(List<Record> records) {
  2. System.out.println("**************************************");
  3. System.out.println("Customers from UnitedKingdom");
  4. Map<String, List<Record>> map = records.parallelStream().filter(r ->
  5. r.getCountry().equals("the United
  6. Kingdom")).collect(Collectors.groupingBy(Record::getCustomer));
  7. map.forEach((k, l) -> System.out.println(k + ": " + l.size()));
  8. System.out.println("**************************************");
  9. }

该方法接收Record对象的列表作为输入参数。首先,使用流获取一个ConcurrentMap>对象,其中有客户ID以及含有每个客户相关记录的列表。该流首先从parallelStream()方法创建一个并行流。然后,使用filter()方法选择那些country属性值为'the United Kingdom'Record对象。最后,使用collect()方法,传递Collectors.groupingByConcurrent()方法的功能,按照job属性的取值对流的实际元素进行分组。需要考虑的是groupingByConcurrent()方法是无序的收集器。收集到列表中的记录可以以任意顺序排列,而非原始顺序(和简单的groupingBy()收集器不同)。

一旦获得了ConcurrentMap对象,就可以使用forEach()方法将信息输出到屏幕。

  • 来自英国的订单的产品数量

该方法的主要目的是获得来自英国的订单的产品数量的统计信息(最大值、最小值和平均值)。

该方法的源代码如下:

  1. public static void quantityFromUnitedKingdom(List<Record> records) {
  2. System.out.println("****************************************");
  3. System.out.println("Quantity from the United Kingdom");
  4. DoubleSummaryStatistics statistics = records.parallelStream()
  5. .filter(r -> r.getCountry().equals("the United Kingdom"))
  6. .collect(Collectors.summarizingDouble(Record::getQuantity));
  7. System.out.println("Min: " + statistics.getMin());
  8. System.out.println("Max: " + statistics.getMax());
  9. System.out.println("Average: " + statistics.getAverage());
  10. System.out.println("****************************************");
  11. }

该方法接收Record对象列表作为输入参数,并且使用流来获取带有统计信息的DoubleSummaryStatistics对象。首先,使用parallelStream()方法获取并行流。然后,使用filter()方法获取来自英国的记录。最后,使用以Collectors.summarizingDouble()为参数的collect()方法获取DoubleSummaryStatistics对象。该类实现了DoubleConsumer接口,并且收集在accept()方法中接收到的数值的统计数据。该流的collect()方法在内部调用了accept()方法。Java还提供了IntSummaryStatistics类和LongSummaryStatistics类,同样也是为了从int型和long型数值中获取统计数据。

本例使用max()min()average()方法分别获取最大值、最小值和平均值。

  • 订购产品的国家

该方法的主要目的是获取订购了ID为85123A的产品的国家列表。

该方法的源代码如下:

  1. public static void countriesForProduct(List<Record> records) {
  2. System.out.println("****************************************");
  3. System.out.println("Countries for product 85123A");
  4. records.parallelStream().filter(r -> r.getStockCode()
  5. .equals("85123A")).map(r -> r
  6. .getCountry()).distinct().sorted()
  7. .forEachOrdered(System.out::println);
  8. System.out.println("****************************************");
  9. }

该方法接收一个Record对象列表作为输入参数,并且使用parallelStream()方法获取并行流。然后,使用filter()方法仅获取与该产品相关的记录。然后,使用map()方法获取一个String对象流,其中含有与记录相关的国家名称。借助distinct()方法,仅选取唯一值,而借助sorted()方法,可以按照字母顺序对这些值进行排序。

最后,使用forEachOrdered()方法输出结果。请注意,此处不要使用forEach()方法,因为它输出的结果没有特定顺序,这将使sorted()这一步的工作成为无用功。元素顺序并不重要时,forEach()操作就很有用了。对于并行流来说,它比forEachOrdered()方法的处理速度更快。

  • 产品数量

使用流时,最常见的错误之一是试图重用流。我们会展示这种做法所产生的错误结果。该方法的主要目的是获取ID为85123A的产品记录相关的最大和最小产品数目。

该方法的第一个版本是尝试重用一个流,其源代码如下:

  1. public static void quantityForProduct(List<Record> records) {
  2. System.out.println("****************************************");
  3. System.out.println("Quantity for Product");
  4. IntStream stream = records.parallelStream().filter(r -> r
  5. .getStockCode().equals("85123A"))
  6. .mapToInt(r -> r.getQuantity());
  7. System.out.println("Max quantity: " + stream.max().getAsInt());
  8. System.out.println("Min quantity: " + stream.min().getAsInt());
  9. System.out.println("****************************************");
  10. }

该方法接收一个Record对象列表作为输入参数。首先,使用该列表创建一个IntStream对象。借助parallelStream()方法,创建一个并行流。然后,使用filter()方法获取与产品相关的记录,使用mapToInt()方法将一个Record对象的流转换成一个IntStream对象,用getQuantity()方法的值替换每个对象。

借助max()方法,可以用该流获取最大值,而借助min()方法,可以获取最小值。如果再次执行该方法,将立刻得到IllegalStateException异常,并且获得“已经对流进行操作”或者“流已关闭”的消息。

可以通过创建两个不同的流来解决这一问题,其中一个流用于获取最大值,而另一个流用于获取最小值。这一供选方案的源代码如下所示:

  1. public static void quantityForProductOk(List<Record> records) {
  2. System.out.println("****************************************");
  3. System.out.println("Quantity for Product Ok");
  4. int value = records.parallelStream().filter(r ->
  5. r.getStockCode().equals("85123A")).mapToInt(r -> r.getQuantity()).max()
  6. .getAsInt();
  7. System.out.println("Max quantity: " + value);
  8. value = records.parallelStream().filter(r -> r.getStockCode()
  9. .equals("85123A")).mapToInt(r -> r
  10. .getQuantity()).min().getAsInt();
  11. System.out.println("Min quantity: " + value);
  12. System.out.println("****************************************");
  13. }

另一个供选方案是使用summaryStatistics()方法获取IntSummaryStatistics对象,这与上文所给出的方法相同。

  • 多个数据筛选器

该方法的主要目标是获取至少满足如下条件之一的记录数。

  1. - <code>quantity</code>属性值大于50的记录数。
  2. - <code>unitPrice</code>属性值大于10的记录数。

实现该方法的一种解决方案是实现一个筛选器来检验元素是否满足这些条件。另一种解决方案可以借助Stream接口提供的concat()方法。源代码如下所示:

  1. public static void multipleFilterData(List<Record> records) {
  2. System.out.println("****************************************");
  3. System.out.println("Multiple Filter");
  4. Stream<Record> stream1 = records.parallelStream()
  5. .filter(r -> r.getQuantity() > 50);
  6. Stream<Record> stream2 = records.parallelStream()
  7. .filter(r -> r.getUnitPrice() > 10);
  8. Stream<Record> complete = Stream.concat(stream1, stream2);
  9. Long value = complete.parallel().unordered().map(r -> r
  10. .getStockCode()).distinct().count();
  11. System.out.println("Number of products: " + value);
  12. System.out.println("****************************************");

该方法接收Record对象列表作为输入参数。首先,创建两个流,其中分别含有满足上述条件的元素,然后使用concat()方法将它们合并成单一的流。concat()方法创建的流只是将第二个流的元素直接跟到第一个流的元素后。出于这种原因,对于最后的流,可以使用parallel()将其转换成一个并行流,使用unordered()方法获得一个未排序的流以便在对并行流应用distinct()方法时获得更好的性能,使用map()方法将每条记录转换为一个含有产品stockCode的字符串值,使用distinct()方法获得唯一值,使用count()方法获得流中的元素数。

这并不是最优的解决方案。我们只是用它展示了concat()distinct()方法如何工作。可以使用下面的代码以更优的方式实现同样的结果。

  1. public static void multipleFilterDataPredicate (List<Record> records) {
  2. System.out.println("****************************************");
  3. System.out.println("Multiple filter with Predicate");
  4. Predicate<Record> p1 = r -> r.getQuantity() > 50;
  5. Predicate<Record> p2 = r -> r.getUnitPrice() > 10;
  6. Predicate<Record> pred = Stream.of(p1, p2)
  7. .reduce(Predicate::or).get();
  8. long value = records.parallelStream().filter(pred).count();
  9. System.out.println("Number of products: " + value);
  10. System.out.println("****************************************");
  11. }

我们创建一个含有两个谓词的流,并且通过Predicate::or操作约简,进而构建复合谓词,当任何一个输入谓词为true时,该谓词都为true。也可以使用Predicate::and约简操作构建一个复合谓词,如此当所有输入谓词都为true时,复合谓词才为true

  • 最高发货量

该方法的主要目的是获取发货量最高的10张发货单。

首先,构建一个Map,其键为发货单的ID,其值为与发货单相关联的所有记录的列表。

  1. public static void getBiggestInvoiceAmmounts(List<Record> records) {
  2. System.out.println("****************************************");
  3. System.out.println("Biggest Invoice Ammounts");
  4. Map<String, List<Record>> map = records.stream().unordered()
  5. .parallel().collect(Collectors
  6. .groupingByConcurrent(r -> r.getId()));

使用unordered()方法删除列表现有的顺序,以便在并行操作时获得更好的性能。然后,使用parallel()方法将该流转换成并行流,最后使用采用了groupingByConcurrent()收集器的collect()方法获得最终的Map。

第二步,创建关于Invoice对象的ConcurrentLinkedDeque数据结构。这部分源码如下所示:

  1. ConcurrentLinkedDeque<Invoice> invoices= new ConcurrentLinkedDeque();
  2. map.values().parallelStream().forEach( list -> {
  3. Invoice invoice = new Invoice();
  4. invoice.setId(list.get(0).getId());
  5. double ammount=list.stream().mapToDouble(r -> r.getUnitPrice()* r
  6. .getQuantity()).sum();
  7. invoice.setAmmount(ammount);
  8. invoice.setCustomerId(list.get(0).getCustomer());
  9. invoices.add(invoice);
  10. });

这里我们有两个流。首先,使用并行流处理上一个Map中的所有值。对于每个含有发货单记录的列表,使用发货单ID、客户ID和发货总量等属性创建一个Invoice对象。为了计算每个发货单的总量,使用另一个流和mapToDouble()方法将每条记录更改为每种产品的单位数量和unitPrice属性,并且使用sum()方法对最终Stream中的所有值进行汇总。之所以使用ConcucrrentLinkedDeque结构,是因为它允许进行并发插入操作并且不会引起数据竞争,而这一特性对于当前情况非常重要。

最后,获取发货量最高的10张发货单,这部分代码如下所示:

  1. System.out.println("Invoices: "+invoices.size()+": "+map.getClass());
  2. invoices.stream().sorted(Comparator.comparingDouble
  3. (Invoice::getAmmount).reversed()).limit(10).forEach(i ->
  4. System.out.println("Customer:"+i.getCustomerId() +
  5. "; Ammount: "+ i.getAmmount()));
  6. System.out.println("****************************************");
  7. }

使用ConcurrentLinkedDeque数据结构创建流。使用sorted()方法进行排序,以将发货量最大的发货单排在最前面,将发货量较小的发货单放在后面。再使用limit()方法选取发货量最高的10张发货单,并且使用forEach()方法将它们输出到控制台。这里是对排序后的流进行操作,因此采用了顺序流。采用并发流并不会带来更好的性能。

  • 单价在1到10之间的产品

该方法的主要目标是获取文件中单价在1到10之间的产品数。

该方法的源代码如下所示:

  1. public static void productsBetween1and10(List<Record> records) {
  2. System.out.println("****************************************");
  3. System.out.println("Products between 1 and 10");
  4. int count=records.stream().unordered().parallel().filter(r -> (r
  5. .getUnitPrice() >=1 ) && (r.getUnitPrice() <=10))
  6. .map(i -> i.getStockCode()).distinct()
  7. .mapToInt(a -> 1).reduce(0, Integer::sum);
  8. System.out.println("Products between 1 and 10: "+count);
  9. System.out.println("****************************************");
  10. }

该方法接收Record对象列表作为输入参数,并且使用stream()unordered()parallel()方法获取一个并行流,且不受该流现有的排序限制。然后,使用filter()方法仅选取unitPrice值在110之间的记录。接下来,使用map()方法将每个记录替换为其stockCode属性的值。之后,使用distinct()方法删除重复记录,并且使用map()方法将每个取值转换为值1。最后,使用reduce()方法将所有1值汇总起来并且返回最终结果。

reduce()方法的第一个参数是其ID,第二个参数是用于从流的所有元素中获取单个值的操作。

本例使用Integer::sum操作。第一次是对初始值和流的第一个值求和,第二次则是对第一次求和的结果与流的第二个值进行求和,以此类推。

  • ConcurrentMain

ConcurrentMain类实现了main()方法,用于测试ConcurrentStatistic类。首先,实现measure()方法,以测量一个任务的执行时间。

  1. public class ConcurrentMain {
  2. static Map<String, List<Double>> totalTimes = new LinkedHashMap<>();
  3. static List<Record> records;
  4. private static void measure(String name, Runnable r) {
  5. long start = System.nanoTime();
  6. r.run();
  7. long end = System.nanoTime();
  8. totalTimes.computeIfAbsent(name, k -> new ArrayList<>())
  9. .add((end - start) / 1_000_000.0);
  10. }

使用一个Map存放每个方法的执行时间。每个方法将执行10次,以观察在第一次执行之后执行时间如何缩减。然后,给出main()方法的代码。它使用measure()方法度量每个方法的执行时间并且将该过程重复10次。

  1. public static void main(String[] args) throws IOException {
  2. Path path = Paths.get("data\\Online_Retail.csv");
  3. for (int i = 0; i < 10; i++) {
  4. measure("Customers from UnitedKingdom", () -> ConcurrentStatistics
  5. .customersFromUnitedKingdom(records));
  6. measure("Quantity from UnitedKingdom", () -> ConcurrentStatistics
  7. .quantityFromUnitedKingdom(records));
  8. measure("Countries for Product", () -> ConcurrentStatistics
  9. .countriesForProduct(records));
  10. measure("Quantity for Product", () -> ConcurrentStatistics
  11. .quantityForProductOk(records));
  12. measure("Multiple Filter for Products", () -> ConcurrentStatistics
  13. .multipleFilterData(records));
  14. measure("Multiple Filter for Products with Predicate", () ->
  15. ConcurrentStatistics.multipleFilterDataPredicate(records));
  16. measure("Biggest Invoice Ammount", () -> ConcurrentStatistics
  17. .getBiggestInvoiceAmmounts(records));
  18. measure("Products Between 1 and 10", () -> ConcurrentStatistics
  19. .productsBetween1and10(records));
  20. }

最后,将所有执行时间和平均执行时间输出到控制台,如下所示:

  1. times.stream().map(t -> String.format("%6.2f", t))
  2. .collect(Collectors.joining(" ")),
  3. times.stream().mapToDouble(Double::doubleValue)
  4. .average().getAsDouble()));
  5. }
  6. }

8.2.2 串行版本

在本例中,串行版和并发版几乎相同,只是将对parallelStream()方法的调用替换成了对stream()方法的调用,以便获得顺序流而非并行流。我们还要删除在一个样例中用到的对parallel()方法的调用,并且将调用groupingByConcurrent()方法更改为调用groupingBy()方法。

8.2.3 对比两个版本

执行两个版本的操作,以测试并行流是否可以提供更好的性能。

使用JMH框架执行该操作,该框架允许你在Java中实现微型基准测试。使用面向基准测试的框架是比较好的解决方案,它直接用currentTimeMillis()或者nanoTime()方法度量时间。在两种不同的架构上分别执行这些示例10次。

  • 一台计算机配置了Intel Core i5-5300处理器、Windows 7操作系统和16GB的RAM。该处理器有两个核,且每个核可以执行两个线程,这样就有四个并行线程。
  • 另一台计算机配置了AMD A8-640处理器、Windows 10操作系统和8GB的RAM。该处理器有四个核。

以下便是运行结果,以毫秒为单位。

操作Intel架构AMD架构
顺序流并行流顺序流并行流
订购产品的国家19.14615.51780.99445.833
来自英国的客户242.593240.003783.044750.199
最大发货量81.61270.853358.488174.395
多筛选器数据24.37120.026101.65860.098
带有谓词的多筛选器数据11.3389.46256.8134.715
单价介于0到10之间的产品45.06527.394187.9185.299
产品总量24.61422.675126.08865.897
英国订购的总量24.48814.722132.16155.278

我们可以看到并行流总是比串行流具有更好的性能。下面给出的是所有示例的加速比。

操作 Intel加速比 AMD加速比
订购产品的国家 1.23 1.77
来自英国的客户 1.01 1.04
最大发货量 1.15 2.06
多筛选器数据 1.21 1.69
带有谓词的多筛选器数据 1.19 1.64
单价介于1到10之间的产品 1.64 2.20
产品总量 1.08 1.91
英国订购的总量 1.66 2.39