2.2 第一个例子:矩阵乘法

矩阵乘法是针对矩阵做的基本运算之一,也是并发和并行编程课程中常采用的经典问题。如果你有一个mn 列的矩阵A,和另一个np 列的矩阵B,那么可以将两个矩阵相乘得到一个mp 列的矩阵C

本节将实现两个矩阵相乘的串行版本算法,以及三种不同的并发版本。然后,我们将比较四个解决方案,看看何时并发处理会带来更好的性能。

2.2.1 公共类

为了实现这个例子,我们用到了一个名为MatrixGenerator的类。使用它随机生成将进行乘法操作的矩阵。这个类有一种名为generate()的方法,它接收矩阵中所需的行数和列数作为参数,并基于这两个维数生成一个带有随机double值的矩阵。该类的源代码如下:

  1. public class MatrixGenerator {
  2. public static double[][] generate (int rows, int columns) {
  3. double[][] ret=new double[rows][columns];
  4. Random random=new Random();
  5. for (int i=0; i<rows; i++) {
  6. for (int j=0; j<columns; j++) {
  7. ret[i][j]=random.nextDouble()*10;
  8. }
  9. }
  10. return ret;
  11. }
  12. }

2.2.2 串行版本

我们在SerialMultiplier类中实现了该算法的串行版本。该类只有一种静态方法,名为multiply()。它接收三个double型矩阵作为参数:其中两个矩阵是将要相乘的矩阵,另一个矩阵用于存储结果。

我们并不检查矩阵的维数,只保证其正确性,并使用一个三重嵌套循环计算结果矩阵。SerialMultiplier类的源代码如下:

  1. public class SerialMultiplier {
  2. public static void multiply (double[][] matrix1, double[][] matrix2,
  3. double[][] result) {
  4. int rows1=matrix1.length;
  5. int columns1=matrix1[0].length;
  6. int columns2=matrix2[0].length;
  7. for (int i=0; i<rows1; i++) {
  8. for (int j=0; j<columns2; j++) {
  9. result[i][j]=0;
  10. for (int k=0; k<columns1; k++) {
  11. result[i][j]+=matrix1[i][k]*matrix2[k][j];
  12. }
  13. }
  14. }
  15. }
  16. }

我们还实现了一个名为SerialMain的主类,用于测试串行版矩阵乘法算法。在main()方法中,生成两个2000行2000列的随机矩阵,并使用SerialMultiplier类进行两个矩阵的乘法运算。算法执行时间的单位是毫秒,如下所示:

  1. public class SerialMain {
  2. public static void main(String[] args) {
  3. double matrix1[][] = MatrixGenerator.generate(2000, 2000);
  4. double matrix2[][] = MatrixGenerator.generate(2000, 2000);
  5. double resultSerial[][]= new double[matrix1.length]
  6. [matrix2[0].length];
  7. Date start=new Date();
  8. SerialMultiplier.multiply(matrix1, matrix2, resultSerial);
  9. Date end=new Date();
  10. System.out.printf("Serial: %d%n",end.getTime()-start.getTime());
  11. }
  12. }

2.2.3 并行版本

我们已经实现了三种不同的并行算法,基于不同的粒度实现这些例子。

  • 结果矩阵中每个元素对应一个线程。
  • 结果矩阵中每行对应一个线程。
  • 采用与JVM中可用处理器数或核心数相同的线程。

让我们来看看这三个版本的源代码。

  • 第一个并发版本:每个元素一个线程

在这个版本中,我们将在结果矩阵中为每个元素创建一个新的执行线程。例如,将两个2000行2000列的矩阵相乘,得到的矩阵将有4 000 000个元素,因此我们将创建4 000 000个Thread对象。因为如果同时启动所有线程,可能会使系统超载,所以将以10个线程一组的形式启动线程。

启动10个线程后,使用join()方法等待它们完成,而且一旦完成,就启动另外10个线程。我们一直遵循这个过程,直到启动所有必需线程。选择10作为批量处理线程数并没有特殊理由。你也可以更改这一数值,并查看更改后的数值对算法性能的影响。

我们将实现IndividualMultiplierTask类和ParallelIndividualMultiplier类。IndividualMultiplierTask类将实现每个Thread。该类实现了Runnable接口,将使用五个内部属性:两个要相乘的矩阵、结果矩阵,以及要计算的元素的行和列。我们将使用该类的构造函数来初始化所有这些属性:

  1. public class IndividualMultiplierTask implements Runnable {
  2. private final double[][] result;
  3. private final double[][] matrix1;
  4. private final double[][] matrix2;
  5. private final int row;
  6. private final int column;
  7. public IndividualMultiplierTask(double[][] result, double[][]
  8. matrix1, double[][] matrix2,
  9. int i, int j) {
  10. this.result = result;
  11. this.matrix1 = matrix1;
  12. this.matrix2 = matrix2;
  13. this.row = i;
  14. this.column = j;
  15. }

run()方法将计算由rowcolumn属性决定的元素值。下面的代码将展示如何实现该行为。

  1. @Override
  2. public void run() {
  3. result[row][column] = 0;
  4. for (int k = 0; k < matrix1[row].length; k++) {
  5. result[row][column] += matrix1[row][k] * matrix2[k][column];
  6. }
  7. }
  8. }

ParallelIndividualMultiplier类将创建所有必要的执行线程计算结果矩阵。它有一种名为multiply()的方法,接收两个将要相乘的矩阵和第三个用于存储结果的矩阵作为参数。该类将处理结果矩阵的所有元素,并创建一个单独的IndividualMultiplierTask类计算每个元素。如前所述,我们按照10个一组的方式启动线程。启动10个线程后,可使用waitForThreads()辅助方法等待这10个线程最终完成,该方法调用了join()方法。下面的代码块展示了该类的实现:

  1. public class ParallelIndividualMultiplier {
  2. public static void multiply(double[][] matrix1, double[][] matrix2,
  3. double[][] result) {
  4. List<Thread> threads=new ArrayList<>();
  5. int rows1=matrix1.length;
  6. int rows2=matrix2.length;
  7. for (int i=0; i<rows1; i++) {
  8. for (int j=0; j<columns2; j++) {
  9. IndividualMultiplierTask task=new IndividualMultiplierTask
  10. (result, matrix1, matrix2, i, j);
  11. Thread thread=new Thread(task);
  12. thread.start();
  13. threads.add(thread);
  14. if (threads.size() % 10 == 0) {
  15. waitForThreads(threads);
  16. }
  17. }
  18. }
  19. }
  20. private static void waitForThreads(List<Thread> threads){
  21. for (Thread thread: threads) {
  22. try {
  23. thread.join();
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. threads.clear();
  29. }
  30. }

与其他示例相同,我们创建了一个主类用以测试该示例。它与SerialMain类非常相似,但在本例中,我们将它称为ParallelIndividualMain类。此处不再给出该类的源代码。

  • 第二个并发版本:每行一个线程

在这一版本中,我们将在结果矩阵中为每一行创建一个新的执行线程。例如,如果将两个2000行和2000列的矩阵相乘,就要创建4 000 000个线程。正如前面的示例中所做的那样,我们将以10个线程为一组启动线程,然后等待它们终结,再启动新线程。

我们将实现RowMultiplierTask类和ParallelRowMultiplier类以实现该版本。RowMultiplierTask类将实现每个Thread。它实现了Runnable接口,并且将使用五个内部属性:两个要相乘的矩阵、结果矩阵,以及要计算的结果矩阵的行。我们将使用该类的构造函数来初始化所有这些属性,如下所示。

  1. public class RowMultiplierTask implements Runnable {
  2. private final double[][] result;
  3. private final double[][] matrix1;
  4. private final double[][] matrix2;
  5. private final int row;
  6. public RowMultiplierTask(double[][] result, double[][] matrix1,
  7. double[][] matrix2, int i) {
  8. this.result = result;
  9. this.matrix1 = matrix1;
  10. this.matrix2 = matrix2;
  11. this.row = i;
  12. }

run()方法有两个循环。第一个循环将处理待计算结果矩阵row中的所有元素,而第二个循环将计算每个元素的结果值。

  1. @Override
  2. public void run() {
  3. for (int j = 0; j < matrix2[0].length; j++) {
  4. result[row][j] = 0;
  5. for (int k = 0; k < matrix1[row].length; k++) {
  6. result[row][j] += matrix1[row][k] * matrix2[k][j];
  7. }
  8. }
  9. }
  10. }

ParallelRowMultiplier类将创建计算结果矩阵所需的所有执行线程。它有一种名为multiply()的方法,该方法接收两个待乘矩阵和第三个用于存储结果的矩阵作为参数。它将处理结果矩阵的所有行,并创建一个RowMultiplierTask处理每一行。如前所述,我们以10个为一组的方式启动线程。启动10个线程后,使用waitForThreads()辅助方法等待这10个线程最终完成,它将调用join()方法。下面的代码块展示了如何实现这个类:

  1. public class ParallelRowMultiplier {
  2. public static void multiply(double[][] matrix1, double[][]
  3. matrix2, double[][] result) {
  4. List<Thread> threads = new ArrayList<>();
  5. int rows1 = matrix1.length;
  6. for (int i = 0; i < rows1; i++) {
  7. RowMultiplierTask task = new RowMultiplierTask(result,
  8. matrix1, matrix2, i);
  9. Thread thread = new Thread(task);
  10. thread.start();
  11. threads.add(thread);
  12. if (threads.size() % 10 == 0) {
  13. waitForThreads(threads);
  14. }
  15. }
  16. }
  17. private static void waitForThreads(List<Thread> threads){
  18. for (Thread thread : threads) {
  19. try {
  20. thread.join();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. threads.clear();
  26. }
  27. }

与其他示例相同,我们创建了一个主类用以测试这个例子。它与SerialMain类非常相似,但在本例中,我们将它称为ParallelRowMain类。此处不再给出该类的源代码。

  • 第三个并发版本:线程的数量由处理器决定

在最后一个版本中,只创建与JVM可用核或处理器数量相同的线程。我们使用Runtime类的availableProcessors()方法计算这一数值。

GroupMultiplierTask类和ParallelGroupMultiplier类中实现了此版本。GroupMultiplierTask类实现了我们将要创建的线程。它实现了Runnable接口,并且使用了五个内部属性:两个要相乘的矩阵、结果矩阵,以及该任务将要计算的结果矩阵的初始行和最终行。我们将使用该类的构造函数初始化所有这些属性。下面的代码块展示了如何实现类的第一部分:

  1. public class GroupMultiplierTask implements Runnable {
  2. private final double[][] result;
  3. private final double[][] matrix1;
  4. private final double[][] matrix2;
  5. private final int startIndex;
  6. private final int endIndex;
  7. public GroupMultiplierTask(double[][] result, double[][]
  8. matrix1, double[][] matrix2,
  9. int startIndex, int endIndex) {
  10. this.result = result;
  11. this.matrix1 = matrix1;
  12. this.matrix2 = matrix2;
  13. this.startIndex = startIndex;
  14. this.endIndex = endIndex;
  15. }

run()方法将使用三个循环实现其计算。第一个循环将检查该任务将要计算的结果矩阵的行,第二个循环将处理每一行的所有元素,最后一个循环将计算每个元素的值。

  1. @Override
  2. public void run() {
  3. for (int i = startIndex; i < endIndex; i++) {
  4. for (int j = 0; j < matrix2[0].length; j++) {
  5. result[i][j] = 0;
  6. for (int k = 0; k < matrix1[i].length; k++) {
  7. result[i][j] += matrix1[i][k] * matrix2[k][j];
  8. }
  9. }
  10. }
  11. }
  12. }

ParallelGroupMutiplier类将创建线程计算结果矩阵。它有一种名为multiply()的方法,接收要相乘的两个矩阵和第三个用于存放结果的矩阵作为参数。首先,通过使用Runtime类的availableProcessors()方法获取可用处理器的数量。然后,计算每个任务必须处理的行,以及创建并启动这些线程。最后,使用join()方法等待线程结束。

  1. public class ParallelGroupMultiplier {
  2. public static void multiply(double[][] matrix1, double[][] matrix2,
  3. double[][] result) {
  4. List<Thread> threads=new ArrayList<>();
  5. int rows1=matrix1.length;
  6. int numThreads=Runtime.getRuntime().availableProcessors();
  7. int startIndex, endIndex, step;
  8. step=rows1 / numThreads;
  9. startIndex=0;
  10. endIndex=step;
  11. for (int i=0; i<numThreads; i++) {
  12. GroupMultiplierTask task=new GroupMultiplierTask
  13. (result, matrix1, matrix2, startIndex, endIndex);
  14. Thread thread=new Thread(task);
  15. thread.start();
  16. threads.add(thread);
  17. startIndex=endIndex;
  18. endIndex= i==numThreads-2?rows1:endIndex+step;
  19. }
  20. for (Thread thread: threads) {
  21. try {
  22. thread.join();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. }

与其他示例相同,我们创建了一个主类测试这个例子。它与SerialMain类非常相似,但在本例中,我们将它称为ParallelGroupMain类。此处不再给出该类的源代码。

  • 比较方案

比较一下本节中实现的乘法器算法四个版本的解决方案(包括串行版和并发版)。为了测试该算法,我们已经使用JMH框架执行这些示例,该框架可支持在Java中实现微基准测试。使用基准测试框架是一种很好的解决方案,可以直接使用currentTimeMillis()nanoTime()等方法度量时间。在两种不同架构中,执行这些例子各10次。

  • 一台计算机配置有Intel Core i5-5300i处理器、Windows 7操作平台和16GB内存。该处理器有两个核,每个核可以执行两个线程,所以将有四个并行线程。
  • 另一台计算机配置有AMD A8-640处理器、Windows 10操作系统和8GB内存,此处理器有四个核。
    我们已用三种不同大小的随机矩阵测试了算法:

  • 500×500

  • 1000×1000
  • 2000×2000
    下表给出了平均执行时间以及标准偏差(单位:毫秒)。

算法规模AMDIntel 串行版5001821.729±366.885447.920±49.864 100027 661.481±796.6705474.942±164.447 2000315 457.940±32 961.16570 968.563±4056.883 按个体处理的并行版50043 512.382±813.13117 152.883±170.408 1000164 968.834±1034.45372 858.419±381.258 2000774 681.287±17 380.02316 466.479±5033.577 按行处理的并行版500685.465±72.474229.228±61.497 10008565±437.6113710.613±411.490 200092 923.685±11 595.43342 655.081±1370.940 按分组处理的并行版500515.743±51.106133.530±12.271 10007466.880±409.1363862.635±368.427 200086 639.811±2834.143 353.603±1857.568

由上表可以得出以下结论。

  • 这两种架构有很大不同,但是你必须考虑到两台电脑处理器、操作系统、内存和硬盘等的配置不同。
  • 在两种架构上得到的结果相同。按分组处理的并行版和按行处理的并行版得到了最佳结果,而按个体处理的并行版得到的结果最差。
    这个例子告诉我们,开发一个并发应用程序时必须非常小心。如果没有选择良好的解决方案,那么性能表现会很糟糕。

针对500×500矩阵,我们用性能最佳的并发版本和串行版本求取加速比,以此来考查并发处理对算法性能的改进情况。

\begin{aligned}&S_{{\rm AMD}}=\frac{T_{\rm serial}}{T_{\rm concurrent}}=\frac{1821.729}{515.743}=3.53\\&S_{{\rm Intel}}=\frac{T_{\rm serial}}{T_{\rm concurrent}}=\frac{447.920}{133.530}=3.35\end{aligned}