2.3 第二个例子:文件搜索

所有操作系统都提供了一种功能,即在文件系统中搜索符合某种条件的文件。(例如,按照名称或部分名称、修改日期等进行搜索。)在我们的示例中将实现一个算法,用于查找具有预定名称的文件。该算法将采用启动搜索的初始路径和要查找的文件作为输入。JDK提供了遍历目录树结构的功能,因此不需要再次在实际应用中自己实现它。

2.3.1 公共类

这两个版本的算法将共享一个公共类用以存储搜索结果。我们将其称为Result类,而它有两个属性:一个名为foundBoolean值,用于判定是否找到了正在查找的文件;一个名为pathString值。如果找到了该文件,就将其完整路径存放在该属性中。

这个类的代码非常简单,所以此处不再给出源代码。

2.3.2 串行版本

这个算法的串行版本非常简单。搜索初始路径,获取文件和目录内容,并对其进行处理。对于文件来说,会将其名称与正在寻找的名称进行比较。如果相同,则将其填入Result对象并完成算法执行。对于各目录来说,我们对本操作进行递归调用,以便在这些目录中搜索文件。

我们将在SerialFileSearch类的searchFiles()方法中实现这个操作。SerialFileSearch类的源代码如下:

  1. public class SerialFileSearch {
  2. public static void searchFiles(File file, String fileName,
  3. Result result) {
  4. File[] contents;
  5. contents=file.listFiles();
  6. if ((contents==null) || (contents.length==0)) {
  7. return;
  8. }
  9. for (File content : contents) {
  10. if (content.isDirectory()) {
  11. searchFiles(content,fileName, result);
  12. } else {
  13. if (content.getName().equals(fileName)) {
  14. result.setPath(content.getAbsolutePath());
  15. result.setFound(true);
  16. System.out.printf("Serial Search: Path: %s%n",
  17. result.getPath());
  18. return;
  19. }
  20. }
  21. if (result.isFound()) {
  22. return;
  23. }
  24. }
  25. }
  26. }

2.3.3 并发版本

并行化该算法有多种方法(如下所示)。

  • 你可以为我们要处理的每个目录创建一个执行线程。
  • 你可以将目录树分组,并为每个组创建执行线程。你创建的组数将决定应用程序使用的执行线程数。
  • 你可以使用与JVM的可用核数相同的线程数。

在这种情况下,我们必须考虑到算法将集中使用I/O操作。因为一次只有一个线程可以读取磁盘,所以不是所有解决方案都会提高算法串行版本的性能。

我们将按照最后一种供选方案实现并发版本。将在一个ConcurrentLinkedQueue(一个可以在并发应用程序中使用的队列Queue接口实现)中存储初始路径所包含的目录,并创建与JVM可用处理器数量相同的线程。每个线程将从队列中获取一条路径,并处理该目录及其所有子目录和其中的文件。线程处理完毕该目录中的所有文件和目录时,将从队列中提取另一个目录。

如果其中一个线程找到了正在查找的文件,该线程会立即终止执行。在这种情况下,我们使用interrupt()方法结束其他线程的执行。

我们在ParallelGroupFileTask类和ParallelGroupFileSearch类中实现了该版本的算法。ParallelGroupFileTask类实现了所有将用于查找文件的线程。它实现了Runnable接口并且使用了四个内部属性:一个名为fileNameString属性,用于存储待查找文件的名称;一个名为directoriesFile对象的ConcurrentLinkedQueue,用于存放将要处理的目录列表;一个名为parallelResultResult对象,用于存储搜索结果;一个名为foundBoolean属性,用于标记是否发现了正在寻找的文件。我们将使用该类的构造函数初始化所有属性:

  1. public class ParallelGroupFileTask implements Runnable {
  2. private final String fileName;
  3. private final ConcurrentLinkedQueue<File> directories;
  4. private final Result parallelResult;
  5. private boolean found;
  6. public ParallelGroupFileTask(String fileName, Result parallelResult,
  7. ConcurrentLinkedQueue<File>directories) {
  8. this.fileName = fileName;
  9. this.parallelResult = parallelResult;
  10. this.directories = directories;
  11. this.found = false;
  12. }

run()方法有一个循环,在队列中有元素并且没有找到该文件时会被执行。它使用ConcurrentLinkedQueue类的poll()方法处理下一个目录,并调用辅助方法processDirectory()。如果找到了这个文件(found属性为true),那么使用return语句结束线程。

  1. @Override
  2. public void run() {
  3. while (directories.size() > 0) {
  4. File file = directories.poll();
  5. try {
  6. processDirectory(file, fileName, parallelResult);
  7. if (found) {
  8. System.out.printf("%s has found the file%n",
  9. Thread.currentThread().getName());
  10. System.out.printf("Parallel Search: Path: %s%n",
  11. parallelResult.getPath());
  12. return;
  13. }
  14. } catch (InterruptedException e) {
  15. System.out.printf("%s has been interrupted%n",
  16. Thread.currentThread().getName());
  17. }
  18. }
  19. }

如果找到了作为参数的文件,processDirectory()方法将接收存放待处理目录的File对象、正在查找的文件名和存放结果的Result对象。它使用listFiles()方法获取File对象的内容。listFiles()方法可返回File对象数组并对其进行处理。对于目录来说,这将建立一个新对象对该方法进行递归调用。对于文件来说,该方法将调用辅助的processFile()方法:

  1. private void processDirectory(File file, String fileName,
  2. Result parallelResult) throws
  3. InterruptedException {
  4. File[] contents;
  5. contents = file.listFiles();
  6. if ((contents == null) || (contents.length == 0)) {
  7. return;
  8. }
  9. for (File content : contents) {
  10. if (content.isDirectory()) {
  11. processDirectory(content, fileName, parallelResult);
  12. if (Thread.currentThread().isInterrupted()) {
  13. throw new InterruptedException();
  14. }
  15. if (found) {
  16. return;
  17. }
  18. } else {
  19. processFile(content, fileName, parallelResult);
  20. if (Thread.currentThread().isInterrupted()) {
  21. throw new InterruptedException();
  22. }
  23. if (found) {
  24. return;
  25. }
  26. }
  27. }
  28. }

在处理完每个目录和文件之后,还要检查线程是否被中断。我们使用Thread类的currentThread()方法获取执行该任务的Thread对象,然后使用isInterrupted()方法来验证线程是否被中断。如果线程被中断,我们将抛出一个新的InterruptedExeption异常,在run()方法中捕捉该异常以结束线程的执行。这种机制使我们能够在找到文件后完成搜索。

我们还要检查found属性是否为true。如果为true,我们将立即返回以完成线程的执行。

如果找到了作为参数的文件,processFile()方法接收存储待处理文件的File对象、待查找文件的名称、存放操作结果的Result对象。我们将当前处理File的名称与正在查找的文件名称进行比较。如果两个名称相同,那么填入Result对象并且将found属性设置为true,如下所示:

  1. private void processFile(File content, String fileName,
  2. Result parallelResult) {
  3. if (content.getName().equals(fileName)) {
  4. parallelResult.setPath(content.getAbsolutePath());
  5. this.found = true;
  6. }
  7. }
  8. public boolean getFound() {
  9. return found;
  10. }
  11. }

ParallelGroupFileSearch类使用辅助任务实现了整个算法。它将实现静态的searchFiles()方法,接收一个指向搜索基本路径的File对象、一个存储当前查找文件名称的fileNameString、存放操作结果的Result对象作为参数。

首先,创建ConcurrentLinkedQueue对象,并且将基本路径所包含的所有目录存放在其中,如下所示:

  1. public class ParallelGroupFileSearch {
  2. public static void searchFiles(File file, String fileName,
  3. Result parallelResult) {
  4. ConcurrentLinkedQueue<File> directories = new
  5. ConcurrentLinkedQueue<>();
  6. File[] contents = file.listFiles();
  7. for (File content : contents) {
  8. if (content.isDirectory()) {
  9. directories.add(content);
  10. }
  11. }

然后,我们使用Runtime类的availableProcessors()方法获得JVM可用线程的数量,创建一个ParallelFileGroupTask对象,并且为每个处理器创建一个Thread

  1. int numThreads = Runtime.getRuntime().availableProcessors();
  2. Thread[] threads = new Thread[numThreads];
  3. ParallelGroupFileTask[] tasks = new ParallelGroupFileTask
  4. [numThreads];
  5. for (int i = 0; i < numThreads; i++) {
  6. tasks[i] = new ParallelGroupFileTask(fileName, parallelResult,
  7. directories);
  8. threads[i] = new Thread(tasks[i]);
  9. threads[i].start();
  10. }

最后,等待某个线程找到文件或者所有线程都完成执行为止。对于第一种情况,使用interrupt()方法和前面提到的机制取消其他线程的执行。使用Thread类的getState()方法检查各个线程是否已完成执行,如下所示:

  1. boolean finish = false;
  2. int numFinished = 0;
  3. while (!finish) {
  4. numFinished = 0;
  5. for (int i = 0; i < threads.length; i++) {
  6. if (threads[i].getState() == State.TERMINATED) {
  7. numFinished++;
  8. if (tasks[i].getFound()) {
  9. finish = true;
  10. }
  11. }
  12. }
  13. if (numFinished == threads.length) {
  14. finish = true;
  15. }
  16. }
  17. if (numFinished != threads.length) {
  18. for (Thread thread : threads) {
  19. thread.interrupt();
  20. }
  21. }
  22. }

2.3.4 对比解决方案

比较一下本节中实现的乘法器算法四个版本的解决方案(串行版和并发版)。为了测试该算法,我们使用JMH框架执行这些示例,该框架可支持用Java语言实现微基准测试。使用基准测试框架是一种很好的解决方案,它可以直接使用currentTimeMillis()nanoTime()等方法来计算时间。我们在两种不同架构中执行这些例子各10次。

  • 一台计算机配置有Intel Core i5-5300i处理器、Windows 7操作系统和16GB内存。该处理器有两个核,每个核可以执行两个线程,所以将有四个并行线程。
  • 另一台计算机配置有AMD A8-640处理器、Windows 10操作系统和8GB内存。该处理器有四个核。

在Windows目录下用两个不同的文件名测试算法:

  • hosts
  • yyy.yyy

我们已经在Windows操作系统上测试了算法。第一个文件存在而第二个文件不存在。如果你使用了其他操作系统,要相应地更改文件的名称。以下表格给出了以毫秒为单位的平均执行时间及其标准偏差。

算法范围AMDIntel
串行版hosts5869.019±124.5482955.535±69.252
yyy.yyy26 474.179±785.68014 508.276±195.725
并行版hosts2792.313±100.8851972.248±193.386
yyy.yyy21 337.288±954.34412 742.856±361.681

我们可以得出以下结论。

  • 这两种架构的性能有所区别,但是你必须考虑到它们的处理器、操作系统、内存和硬盘不同。
  • 在两种架构上得到的结果相同。并行算法的性能优于串行算法。对hosts文件的搜索来说,这种性能差异要比查找不存在的文件更大。

我们可以用搜索hosts文件性能最好的并发版本和串行版本求取加速比,以此来观察采用并发处理如何提高算法的性能。

\begin{aligned}&S_{{\rm AMD}}=\frac{T_{\rm serial}}{T_{\rm concurrent}}=\frac{5869.019}{2792.313}=2.10\\&S_{{\rm Intel}}=\frac{T_{\rm serial}}{T_{\rm concurrent}}=\frac{2955.535}{1972.248}=1.5\end{aligned}