7.3 第二个例子:数据筛选算法

假设有大量描述某个项列表的数据。例如,假设有关于很多人的很多属性(姓名、姓氏、地址、电话号码等)。通常需要获得满足特定标准的数据。例如,想要获得在某一街道居住的人或者叫某个特定名字的人。

本节,你将实现这样一个筛选程序。我们采用了来自UCI的Census-Income KDD数据集,该数据集包含了1994年到1995年从美国人口普查局的人口普查结果中抽取的加权人口普查数据。

在本例的并发版本中,你将学会如何撤销在Fork/Join池中运行的任务,以及如何管理在任务中抛出的未校验异常。

7.3.1 公共特性

我们实现了一些类来读取文件数据并且进行筛选。这些类在算法的串行版本和并发版本中都会用到,具体如下。

  • CensusData类:该类存储了39个用于定义人员的属性。该类定义了这些属性以及获取和设置这些属性值的方法。我们将通过编号标识每个属性。该类的evaluateFilter()方法包含了属性名称与属性编号之间的关系。
  • CensusDataLoader类:该类从一个文件中加载人口普查数据。该类有一个load()方法,该方法将文件的路径作为输入参数,返回一个含有文件中所有人员信息的CensusData数组。
  • FilterData类:该类定义了一个数据筛选器。筛选器包括一个属性的编号和该属性的值。
  • Filter类:该类实现了一些方法来判定一个CensusData对象是否满足一个筛选器列表所设定的条件。

这里不再介绍这些类的源代码,它们都非常简单,你可以查看本例源代码的详情。

7.3.2 串行版

我们在两个类中实现了筛选算法的串行版本。SerialSearch类进行数据的筛选,该类提供了两个方法。

  • findAny()方法:该方法接收CensusData对象数组作为参数,其中有来自文件的数据和一个筛选器列表,而且该方法返回一个CensusData对象,其中含有第一个满足筛选器规定标准的人员。
  • findAll()方法:该方法接收CensusData对象数组作为参数,其中有来自文件的数据和一个筛选器列表,而且该方法返回一个CensusData对象数组,其中含有所有满足筛选器规定标准的人员。

SerialMain类实现了该版本程序的main()方法,并且进行程序测试,测量了该算法一些情况下的执行时间。

  • SerialSearch

如前所述,该类实现了数据筛选功能。它提供了两个方法,第一个是findAny()方法,用于查找满足筛选器条件的第一个数据对象。该方法找到第一个数据对象时,其执行完成。相关代码如下:

  1. public class SerialSearch {
  2. public static CensusData findAny (CensusData[] data, List<FilterData>
  3. filters) {
  4. int index=0;
  5. for (CensusData censusData : data) {
  6. if (Filter.filter(censusData, filters)) {
  7. System.out.println("Found: "+index);
  8. return censusData;
  9. }
  10. index++;
  11. }
  12. return null;
  13. }

第二个是findAll()方法,该方法返回一个CensusData对象数组,其中含有满足筛选标准的所有对象,具体如下:

  1. public static List<CensusData> findAll (CensusData[] data,
  2. List<FilterData> filters) {
  3. List<CensusData> results=new ArrayList<CensusData>();
  4. for (CensusData censusData : data) {
  5. if (Filter.filter(censusData, filters)) {
  6. results.add(censusData);
  7. }
  8. }
  9. return results;
  10. }
  11. }
  • SerialMain

你将在不同情况下使用该类测试筛选算法。首先,从文件加载数据,如下所示:

  1. public class SerialMain {
  2. public static void main(String[] args) {
  3. Path path = Paths.get("data","census-income.data");
  4. CensusData data[]=CensusDataLoader.load(path);
  5. System.out.println("Number of items: "+data.length);
  6. Date start, end;

我们要做的第一件事是使用findAny()方法查找出现在数组中第一个位置的对象。可以构建一个筛选器列表,然后调用findAny()方法,该方法的参数为文件中的数据和筛选器列表。

  1. List<FilterData> filters=new ArrayList<>();
  2. FilterData filter=new FilterData();
  3. filter.setIdField(32);
  4. filter.setValue("Dominican-Republic");
  5. filters.add(filter);
  6. filter=new FilterData();
  7. filter.setIdField(31);
  8. filter.setValue("Dominican-Republic");
  9. filters.add(filter);
  10. filter=new FilterData();
  11. filter.setIdField(1);
  12. filter.setValue("Not in universe");
  13. filters.add(filter);
  14. filter=new FilterData();
  15. filter.setIdField(14);
  16. filter.setValue("Not in universe");
  17. filters.add(filter);
  18. start=new Date();
  19. CensusData result=SerialSearch.findAny(data, filters);
  20. System.out.println("Test 1 - Result: "+result
  21. .getReasonForUnemployment());
  22. end=new Date();
  23. System.out.println("Test 1- Execution Time: "+(end.getTime()-
  24. start.getTime()));

筛选器根据下述属性进行查找。

  • 32:这是生父的国籍属性。
  • 31:这是生母的国籍属性。
  • 1:这是工作类别属性,其中一个可能值是Not in universe
  • 14:这是失业原因属性,其中一个可能值是Not in universe
    我们将按照如下方式测试其他用例。

  • 使用findAny()方法查找出现在数组中最后一个位置的对象。

  • 使用findAny()方法尝试查找某个并不存在的对象。
  • 在错误情境中使用findAny()方法。
  • 使用findAll()方法获取满足筛选器列表条件的所有对象。
  • 在错误情境中使用findAll()方法。

7.3.3 并发版本

我们将在并发版本中引入更多要素。

  • 任务管理器:使用Fork/Join框架时,从一个任务开始,并且将该任务分割成两个或者更多子任务,之后再一次次分割,直到问题达到你想要的规模为止。有些情况下,需要结束所有任务。例如,实现findAny()方法并且找到了一个满足所有条件的对象时,就不需要继续执行剩下的任务了。
  • 用于实现findAny()方法的RecursiveTask类:该类是扩展了RecursiveTask类的IndividualTask类。
  • 用于实现findAll()方法的RecursiveTask类:该类是扩展了RecursiveTask类的ListTask类。

下面详细了解一下这些类。

  • TaskManager

我们将使用该类来控制任务的撤销。我们将在下述两种情况中撤销任务的执行。

  • 正在执行findAny()操作并且找到了满足要求的对象。
  • 正在执行findAny()findAll()操作并且在某个任务中出现了一个未校验异常。
    该类声明了两个属性:一个是用于存放所有待撤销任务的ConcurrentLinkedDeque,另一个是用于保证只有一个任务执行cancelTasks()方法的AtomicBoolean变量。使用AtomicBoolean变量确保所有任务都能以线程安全的方式访问它们的值。
  1. public class TaskManager {
  2. private Set<RecursiveTask> tasks;
  3. private AtomicBoolean cancelled;
  4. public TaskManager() {
  5. tasks = ConcurrentHashMap.newKeySet();
  6. cancelled = new AtomicBoolean(false);
  7. }

该类定义了向ConcurrentLinkedDeque添加某个任务的方法,从ConcurrentLinkedDeque中删除某个任务的方法,以及撤销存放在ConcurrentLinkedDeque中所有任务的方法。要撤销这些任务,我们使用在ForkJoinTask类中定义的cancel()方法。该方法的参数为true时会强制中断运行中的任务,如下所示:

  1. public void addTask(RecursiveTask task) {
  2. tasks.add(task);
  3. }
  4. public void cancelTasks(RecursiveTask sourceTask) {
  5. if (cancelled.compareAndSet(false, true)) {
  6. for (RecursiveTask task : tasks) {
  7. if (task != sourceTask) {
  8. if(cancelled.get()) {
  9. task.cancel(true);
  10. }
  11. else {
  12. tasks.add(task);
  13. }
  14. }
  15. }
  16. }
  17. }
  18. public void deleteTask(RecursiveTask task) {
  19. tasks.remove(task);
  20. }

cancelTasks()方法接收一个RecursiveTask对象作为参数。除了调用该方法的任务之外,我们将撤销所有其他任务。我们不想撤销已经找到结果的任务。compareAndSet(false, true)方法将AtomicBoolean变量设置为true,而且当且仅当该变量的当前值为false时才会返回true值。如果AtomicBoolean变量已经有一个true值,那么该方法将返回false值。整个操作都以原子方式执行,因此即使cancelTasks()方法被不同线程同时调用多次,也能够保证if语句的主体部分最多执行一次。

  • IndividualTask

IndividualTask类扩展了RecursiveTask类,以CensusData任务为参数,并且实现了findAny()操作。该类定义了如下属性。

  • 一个含有所有CensusData对象的数组。
  • Start属性和end属性,它们确定了要处理的元素。
  • size属性,它确定了在无须分割任务的前提下所处理的最大元素数。
  • TaskManager类,它用于在必要之时撤销任务。
    下面的代码给出了一个将要应用的筛选器列表。
  1. private CensusData[] data;
  2. private int start, end, size;
  3. private TaskManager manager;
  4. private List<FilterData> filters;
  5. public IndividualTask(CensusData[] data, int start,
  6. int end, TaskManager manager,
  7. int size, List<FilterData> filters) {
  8. this.data = data;
  9. this.start = start;
  10. this.end = end;
  11. this.manager = manager;
  12. this.size = size;
  13. this.filters = filters;
  14. }

该类中的主方法是compute()方法。该方法返回一个CensusData对象。如果任务需要处理的元素数比size属性值小,该方法直接进行对象查找。如果该方法找到了想要的对象,那么它将返回该对象并且使用cancelTasks()方法撤销剩余任务的执行。如果该方法没有找到想要的对象,那么它将返回null值,如下所示:

  1. if (end - start <= size) {
  2. for (int i = start; i < end && ! Thread.currentThread()
  3. .isInterrupted(); i++) {
  4. CensusData censusData = data[i];
  5. if (Filter.filter(censusData, filters)) {
  6. System.out.println("Found: " + i);
  7. manager.cancelTasks(this);
  8. return censusData;
  9. }
  10. }
  11. return null;
  12. }

如果要处理的项数要比size属性规定的多,那么要创建两个子任务来分别处理其中的一半元素。

  1. } else {
  2. int mid = (start + end) / 2;
  3. IndividualTask task1 = new IndividualTask(data, start, mid, manager,
  4. size, filters);
  5. IndividualTask task2 = new IndividualTask(data, mid, end, manager,
  6. size, filters);

然后,向任务管理器添加新创建的任务,并且删除实际任务。如果要撤销任务,即指仅撤销正在运行的任务。

  1. manager.addTask(task1);
  2. manager.addTask(task2);
  3. manager.deleteTask(this);

接着,使用fork()方法以异步方式将任务发送给ForkJoinPool,并且使用quietlyJoin()方法等待其执行结束。

join()方法和quietlyJoin()方法之间的区别在于,join()启动之后,如果任务撤销,将抛出异常,或者在方法内部抛出一个未校验异常,而quietlyJoin()方法则不抛出任何异常。

  1. task1.fork();
  2. task2.fork();
  3. task1.quietlyJoin();
  4. task2.quietlyJoin();

然后,从TaskManager类中删除子任务,如下所示:

  1. manager.deleteTask(task1);
  2. manager.deleteTask(task2);

现在,使用join()方法获取任务的结果。如果一个任务抛出一个未校验异常,那么该异常将被传播而无须特殊处理,而撤销操作则直接被忽略,如下所示:

  1. try {
  2. CensusData res = task1.join();
  3. if (res != null)
  4. return res;
  5. manager.deleteTask(task1);
  6. } catch (CancellationException ex) {
  7. }
  8. try {
  9. CensusData res = task2.join();
  10. if (res != null)
  11. return res;
  12. manager.deleteTask(task2);
  13. } catch (CancellationException ex) {
  14. }
  15. return null;
  16. }
  17. }
  • ListTask

ListTask类扩展了RecursiveTask类,采用一个CensusData对象列表作为参数。我们将使用该任务来实现findAll()操作。该任务和IndividualTask任务非常相似,都使用相同的属性,但是它们在compute()方法上有所不同。

首先,初始化一个List对象以返回结果并且校验任务要处理的元素数量。如果任务要处理的元素数量小于size属性,将满足筛选器指定标准的所有对象添加到结果列表中。

  1. @Override
  2. protected List<CensusData> compute() {
  3. List<CensusData> ret = new ArrayList<CensusData>();
  4. List<CensusData> tmp;
  5. if (end - start <= size) {
  6. for (int i = start; i < end; i++) {
  7. CensusData censusData = data[i];
  8. if (Filter.filter(censusData, filters)) {
  9. ret.add(censusData);
  10. }
  11. }

如果要处理的项数多于size属性,将创建两个子任务来处理其中各一半的元素。

  1. int mid = (start + end) / 2;
  2. ListTask task1 = new ListTask(data, start, mid, manager, size,
  3. filters);
  4. ListTask task2 = new ListTask(data, mid, end, manager, size, filters);

然后,将新创建的任务添加到任务管理器并且删除原来的实际任务。该实际任务并不会被撤销,而其子任务会被撤销,如下所示:

  1. manager.addTask(task1);
  2. manager.addTask(task2);
  3. manager.deleteTask(this);

然后,使用fork()方法以异步方式将任务发送给ForkJoinPool,并且使用quietlyJoin()方法等待其执行结束。

  1. task1.fork();
  2. task2.fork();
  3. task2.quietlyJoin();
  4. task1.quietlyJoin();

然后,从TaskManager中删除子任务。

  1. manager.deleteTask(task1);
  2. manager.deleteTask(task2);

现在,使用join()方法获取任务结果。如果一个任务抛出了未校验异常,那么它将被传播而不经特殊处理,并且会直接忽略撤销操作。

  1. try {
  2. tmp = task1.join();
  3. if (tmp != null)
  4. ret.addAll(tmp);
  5. manager.deleteTask(task1);
  6. } catch (CancellationException ex) {
  7. }
  8. try {
  9. tmp = task2.join();
  10. if (tmp != null)
  11. ret.addAll(tmp);
  12. manager.deleteTask(task2);
  13. } catch (CancellationException ex) {
  14. }
  15. }
  16. }
  • ConcurrentSearch

ConcurrentSearch类实现了findAny()findAll()方法。这两个方法与串行版本的相应方法有着相同的接口。从内部来看,它们初始化TaskManager对象和第一个任务,并且使用execute方法将其发送给默认的ForkJoinPool,然后等待任务结束并且输出结果。下面是findAny()方法的代码。

  1. public class ConcurrentSearch {
  2. public static CensusData findAny (CensusData[] data,
  3. List<FilterData> filters, int size) {
  4. TaskManager manager=new TaskManager();
  5. IndividualTask task=new IndividualTask(data, 0, data.length,
  6. manager, size, filters);
  7. ForkJoinPool.commonPool().execute(task);
  8. try {
  9. CensusData result=task.join();
  10. if (result!=null) {
  11. System.out.println("Find Any Result: "+result.getCitizenship());
  12. return result;
  13. } catch (Exception e) {
  14. System.err.println("findAny has finished with an error: "+
  15. task.getException().getMessage());
  16. }
  17. return null;
  18. }

下面是findAll()方法的代码。

  1. public static CensusData[] findAll (CensusData[] data,
  2. List<FilterData> filters, int size) {
  3. List<CensusData> results;
  4. TaskManager manager=new TaskManager();
  5. ListTask task=new ListTask(data,0,data.length,manager,
  6. size,filters);
  7. ForkJoinPool.commonPool().execute(task);
  8. try {
  9. results=task.join();
  10. return results;
  11. } catch (Exception e) {
  12. System.err.println("findAny has finished with an
  13. error: " + task.getException().getMessage());
  14. }
  15. return null;
  16. }
  • ConcurrentMain

ConcurrentMain类用于测试目标筛选器的并发版本。它和SerialMain类似,只是采用了各种并发版本的操作。

7.3.4 对比两个版本

为了比较筛选算法的串行版本和并发版本,我们分六种情形进行测试。

  • 测试1:测试findAny()方法,查找一个对象,它在CensusData数组中的第一个位置。
  • 测试2:测试findAny()方法,查找一个对象,它在CensusData数组的最后一个位置。
  • 测试3:测试findAny()方法,查找一个不存在的对象。
  • 测试4:在错误情况下测试findAny()方法。
  • 测试5:在正常情况下测试findAll()方法。
  • 测试6:在错误情况下测试findAll()方法。

对于算法的并发版本,我们测试了size参数的3组不同取值,该参数确定了一个任务在不分割成两个子任务时所能处理的最大元素数。测试使用的最大阈值为10、200、2000和4000个元素。

采用JMH框架执行这些示例,该框架允许在Java中实现微型基准测试。使用面向基准测试的框架是比较好的解决方案,它直接用currentTimeMillis()方法或者nanoTime()方法度量时间。在两种不同的架构上分别执行这些示例10次。

  • 一台计算机配置了Intel Core i5-5300处理器、Windows 7操作系统和16GB的RAM。该处理器有两个核,且每个核可以执行两个线程,这样就有四个并行线程。
  • 另一台计算机配置了AMD A8-640处理器、Windows 10操作系统和8GB的RAM。该处理器有四个核。

与其他例子相同,以毫秒为单位度量执行时间。首先给出在AMD架构上的运行结果。

AMD架构
测试用例串行版本并发版本 规模=10并发版本 规模=200并发版本 规模=2000并发版本 规模=4000最优
测试12.3748.0415.4344.8029.339串行版本
测试286.04975.87257.95432.5632.876并发版本
测试358.32270.56222.94730.83127.033并发版本
测试40.6515 090.17259.5978.5855.987串行版本
测试560.12942.97944.8122.74121.287并发版本
测试60.69714 279.35256.2719.3654.842串行版本

在Intel架构上运行的结果如下。

Intel架构
测试用例串行版本并发版本 规模=10并发版本 规模=200并发版本 规模=2000并发版本 规模=4000最优
测试10.7968.8963.2532.082.422串行版本
测试231 00641.31232.97414.40714.55并发版本
测试315.07625.0689.5510.7299.77并发版本
测试40.37810 664.607106.3494.6992.898串行版本
测试513.29118.03725.06110.2628.937并发版本
测试60.35210 901.38791.9985.2462.24串行版本

根据上述表格,可以得出如下结论。

  • 处理相对少量的元素时,算法的串行版本具有更好的性能。
  • 处理所有元素或者其中的一部分时,算法的并发版本具有更好的性能。
  • 在错误情况下,算法的串行版本要比并发版本的性能更好。当size参数的值较小时,并发版本在这种情况下性能非常糟糕。

在这种情况下,并发处理并不会总能提升性能。