7.4 第三个例子:归并排序算法

归并排序算法是一种非常流行的排序算法,通常使用分治方法实现,因此它是一个用于测试Fork/Join框架的很好的候选算法。

为实现归并排序算法,我们将未排序的列表划分为仅有一个元素的子列表。然后,将这些未排序的子列表合并以产生排序后的子列表,直到将所有这些子列表处理完毕。最后得到最初的唯一列表,只不过其中所有的元素都进行了排序处理。

为了编写该算法的并发版本,我们采用了Java 8中引入的CountedCompleter任务。这类任务最重要的特征是,它们都含有一个可在所有子任务执行完毕之后执行的方法。

为了测试上述实现方法,我们使用了亚马逊产品联合采购网络元数据(可以在SNAP搜索“Amazon product co-purchasing network metadata”下载)。我们创建了一个含有542 184个产品的销售排名列表。我们将测试该算法的各个版本,对该产品列表进行排序,并且使用Arrays类的sort()方法和parallelSort()方法来比较执行时间。

7.4.1 共享类

正如前面提到的,已经构建了一个含有542 184款Amazon产品的列表,包括每个产品的ID、产品名称、分组、销售排名、浏览数量、相似产品编码,以及每个产品所属的类别编码。我们实现了AmazonMetaData类来存放产品信息。该类声明了必要的属性以及获取和设置这些属性值的方法。该类实现了Comparable接口以对比该类的两个实例。我们想要按照销售排名以升序排列这些元素。为了实现compare()方法,使用Long类的compare()方法比较两个对象的销售排名,如下所示:

  1. public int compareTo(AmazonMetaData other) {
  2. return Long.compare(this.getSalesrank(),
  3. other.getSalesrank());
  4. }

我们还实现了AmazonMetaDataLoader类,它提供了load()方法。该方法接收含有数据文件的路径作为参数,返回含有所有产品信息的AmazonMetaData对象数组。

7.4 第三个例子:归并排序算法 - 图1 为了集中介绍Fork/Join框架的特性,此处并没有给出这些类的源代码。

7.4.2 串行版本

我们在SerialMergeSort类中实现了归并排序算法的串行版本。SerialMergeSort类实现了算法本身和SerialMetaData类,并提供了测试该算法的main()方法。

  • SerialMergeSort

SerialMergeSort类实现了归并排序算法的串行版本。它提供的mergeSort()方法可接收如下参数。

  • 含有所有待排序数据的数组。
  • 该方法要处理的第一个元素(包含)。
  • 该方法要处理的最后一个元素(不包含)。
    如果该方法仅需要处理一个元素,则其立即返回。否则,它将两次递归调用mergeSort()方法。第一次调用处理前一半元素,第二次调用处理后一半元素。最后,调用merge()方法合并两部分元素,并且获得一个经过排序的元素列表。
  1. public void mergeSort (Comparable data[], int start, int end) {
  2. if (end-start < 2) {
  3. return;
  4. }
  5. int middle = (end+start)>>>1;
  6. mergeSort(data,start,middle);
  7. mergeSort(data,middle,end);
  8. merge(data,start,middle,end);
  9. }

使用(end+start)>>>1操作符获取位于数组中间位置的元素,进而分割数组。例如,如果有15亿个元素(对于当前的内存芯片来说并非不可能),这一操作仍然适用于Java数组。然而,采用(end+start)/2的方法将溢出,结果得到一个负数值的数组。可以阅读名为“Extra, Extra-Read All About It: Nearly All Binary Searches and Mergesorts are Broken”的文章获取有关该问题的详细解释。

merge()方法将两个元素列表合并以得到一个排序的列表。该方法可接收如下参数。

  • 含有所有待排序数据的数组。
  • 三个元素:startmidend,它们将待归并和排序的数组划分成两个部分(start-mid和mid-end)。
    我们创建了一个临时数组来对元素进行排序,然后,处理列表的两部分时,会在数组中对元素进行排序,并且在原始数组相同的位置上存放已排序的列表。如下述代码所示:
  1. private void merge(Comparable[] data, int start, int middle,
  2. int end) {
  3. int length=end-start+1;
  4. Comparable[] tmp=new Comparable[length];
  5. int i, j, index;
  6. i=start;
  7. j=middle;
  8. index=0;
  9. while ((i<middle) && (j<end)) {
  10. if (data[i].compareTo(data[j])<=0) {
  11. tmp[index]=data[i];
  12. i++;
  13. } else {
  14. tmp[index]=data[j];
  15. j++;
  16. }
  17. index++;
  18. }
  19. while (i<middle) {
  20. tmp[index]=data[i];
  21. i++;
  22. index++;
  23. }
  24. while (j<end) {
  25. tmp[index]=data[j];
  26. j++;
  27. index++;
  28. }
  29. for (index=0; index < (end-start); index++) {
  30. data[index+start]=tmp[index];
  31. }
  32. }
  33. }
  • SerialMetaData

SerialMetaData类提供了用于测试算法的main()方法。每个排序算法将执行10次并且计算平均执行时间。首先,从文件中加载数据并且创建该数组的一个副本。

  1. public class SerialMetaData {
  2. public static void main(String[] args) {
  3. for (int j=0; j<10; j++) {
  4. Path path = Paths.get("data","amazon-meta.csv");
  5. AmazonMetaData[] data = AmazonMetaDataLoader.load(path);
  6. AmazonMetaData data2[] = data.clone();

然后,使用Arrays类的sort()方法对第一个数组进行排序。

  1. Date start, end;
  2. start = new Date();
  3. Arrays.sort(data);
  4. end = new Date();
  5. System.out.println("Execution Time Java Arrays.sort(): " +
  6. (end.getTime() - start.getTime()));

接着,使用归并排序算法实现对第二个数组的排序。

  1. SerialMergeSort mySorter = new SerialMergeSort();
  2. start = new Date();
  3. mySorter.mergeSort(data2, 0, data2.length);
  4. end = new Date();
  5. System.out.println("Execution Time Java SerialMergeSort: " +
  6. (end.getTime() - start.getTime()));

最后,检查发现排序后的数组相似。

  1. for (int i = 0; i < data.length; i++) {
  2. if (data[i].compareTo(data2[i]) != 0) {
  3. System.err.println("There's a difference is position " +
  4. i);
  5. System.exit(-1);
  6. }
  7. }
  8. System.out.println("Both arrays are equal");
  9. }
  10. }
  11. }

7.4.3 并发版本

如前所述,我们将使用Java 8中的CountedCompleter类作为面向Fork/Join任务的基类。该类提供了某种机制,当其所有子任务完成执行后会执行某个方法。这种机制就是onCompletion()方法。因此,我们使用compute()方法分割数组,使用onCompletion()方法将子列表合并成一个经过排序的列表。

要实现的并发版解决方案有下述三个类。

  • MergeSortTask类,该类扩展了CountedCompleter类并且实现了执行归并排序算法的任务。
  • ConcurrentMergeSort类,该类启动了第一个任务。
  • ConcurrentMetaData类,该类提供了main()方法来测试归并排序算法的并发版本。

  • MergeSortTask

如前所述,该类实现了将用于执行归并排序算法的任务。该类用到了以下属性。

  • 存放待排序数据的数组。
  • 任务必须进行排序操作的这部分数组的起始位置和终止位置。
    该类还有一个用于初始化其参数的构造函数。
  1. public class MergeSortTask extends CountedCompleter<Void> {
  2. private Comparable[] data;
  3. private int start, end;
  4. private int middle;
  5. public MergeSortTask(Comparable[] data, int start, int end,
  6. MergeSortTask parent) {
  7. super(parent);
  8. this.data = data;
  9. this.start = start;
  10. this.end = end;
  11. }

如果起始索引和终止索引之间的差距大于或等于1024,那么使用compute()方法,将任务分割成两个子任务来分别处理原集合的两个子集。两个任务采用fork()方法以异步方式将任务发送给ForkJoinPool。否则,执行SerialMergeSorg.mergeSort()对数组(具有小于或等于1024个元素)进行排序,然后调用tryComplete()方法。子任务执行完毕之后,该方法将从内部调用onCompletion()方法。如下述代码所示:

  1. @Override
  2. public void compute() {
  3. if (end - start >= 1024) {
  4. middle = (end+start)>>>1;
  5. MergeSortTask task1 = new MergeSortTask(data, start, middle,
  6. this);
  7. MergeSortTask task2 = new MergeSortTask(data, middle, end,
  8. this);
  9. addToPendingCount(1);
  10. task1.fork();
  11. task2.fork();
  12. } else {
  13. new SerialMergeSort().mergeSort(data, start, end);
  14. tryComplete();
  15. }

在我们的例子中采用onCompletion()方法进行归并和排序操作,进而获得排序后的列表。一旦任务完成onCompletion()方法的执行后,它将在其父任务的层面上调用tryComplete()方法以完成该任务的执行。onCompletion()方法的源代码与该算法串行版本的merge()方法非常相似。代码如下所示:

  1. @Override
  2. public void onCompletion(CountedCompleter<?> caller) {
  3. if (middle==0) {
  4. return;
  5. }
  6. int length = end - start + 1;
  7. Comparable tmp[] = new Comparable[length];
  8. int i, j, index;
  9. i = start;
  10. j = middle;
  11. index = 0;
  12. while ((i < middle) && (j < end)) {
  13. if (data[i].compareTo(data[j]) <= 0) {
  14. tmp[index] = data[i];
  15. i++;
  16. } else {
  17. tmp[index] = data[j];
  18. j++;
  19. }
  20. index++;
  21. }
  22. while (i < middle) {
  23. tmp[index] = data[i];
  24. i++;
  25. index++;
  26. }
  27. while (j < end) {
  28. tmp[index] = data[j];
  29. j++;
  30. index++;
  31. }
  32. for (index = 0; index < (end - start); index++) {
  33. data[index + start] = tmp[index];
  34. }
  35. }
  • ConcurrentMergeSort

在并发版本中,该类非常简单。它实现了mergeSort()方法,该方法接收含有待排序数据的数组,以及起始索引(该值总是为0)和终止索引(该值总是为数组的长度)作为参数。此处选择保持与串行版相同的接口。

该方法创建一个新的MergeSortTask,使用invoke()方法将其发送给默认的ForkJoinPool,该方法在该任务完成执行且数组已被排序后返回。

  1. public class ConcurrentMergeSort {
  2. public void mergeSort (Comparable data[], int start, int end) {
  3. MergeSortTask task=new MergeSortTask(data, start, end,null);
  4. ForkJoinPool.commonPool().invoke(task);
  5. }
  6. }
  • ConcurrentMetaData

ConcurrentMetaData类提供了main()方法来测试归并排序算法的并发版本。在我们的例子中,该类的代码和SerialMetaData类的代码相当,只是采用了相关类的并发版本,并且使用Arrays.parallelSort()方法而非Arrays.sort()方法,因此此处不再给出该类的源代码。

7.4.4 对比两个版本

我们执行归并排序算法的串行版和并行版,比较这两个版本的执行时间,并且比较了使用Arrays.sort()方法和Arrays.parallelSort()方法时的执行时间。

使用JMH框架执行这四个版本,该框架允许在Java中实现微型基准测试。使用面向基准测试的框架是比较好的解决方案,因为可以直接使用currentTimeMillis()或者nanoTime()度量时间。在两种不同的架构上分别执行这些示例10次。

  • 一台计算机配置了Intel Core i5-5300处理器、Windows 7操作系统和16GB的RAM。该处理器有两个核且每个核可以执行两个线程,这样就有四个并行线程。
  • 另一台计算机配置了AMD A8-640处理器、Windows 10操作系统和8GB的RAM。该处理器有四个核。

下面给出的是对含有542 184个对象的数据集进行排序计算后得到的执行时间(以毫秒为单位)。

Arrays.sort() 串行版归并排序 Arrays.parallelSort() 并行版归并排序
AMD架构 858.1 1268.3 392.6 705.1
Intel架构 327.608 454.84 209.653 209.732

可以得出下述结论。

  • 使用Arrays.parallelSort()方法可以得到最佳结果。对于串行算法来说,Arrays.sort()方法比我们的实现方法在执行时间上更优。
  • 就我们的实现情况来说,算法的并发版本比串行版本性能更好。

我们可以用加速比来比较归并排序算法串行版本和并发版本的执行时间。

\begin{aligned}&S_{{\rm AMD}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{1268.3}{705.33}=1.80\\&S_{{\rm Intel}}=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{454.84}{298.732}=1.522\end{aligned}