3.2 第一个例子:k-最近邻算法

k-最邻近算法是一种用于监督分类的简单机器学习算法。该算法的主要组成部分如下所示。

  • 训练数据集:该数据集由实例构成,其中包括定义每个实例的一个或者多个属性,以及一个可确定实例标签的特殊属性。
  • 距离指标:该指标用于确定训练数据集的实例与你想要分类的新实例之间的距离(或者说相似度)。
  • 测试数据集:该数据集用于度量算法的行为。

对某个实例进行分类时,该算法计算该实例和训练数据集所有实例的距离。然后,选取k个距离最邻近的实例并且查看这些实例的标签。实例最多的标签将被指派为输入实例的标签。

在本章中,我们将采用UCI机器学习资源库(UCI Machine Learning Repository)的Bank Marketing数据集。为了度量实例之间的距离,我们将采用欧氏距离(Euclidean distance)。该指标要求实例的所有属性必须有数值。Bank Marketing数据集的一些属性是“类别型的”,也就是说,这些属性可以从一些预定义值中取值,这样就不能直接对该数据集使用欧氏距离。可以为每个类别型的值指派一个序号。例如,对于婚姻状况来说,可用0代表单身,1代表已婚,2代表离婚。然而,这可能意味着离婚的人与已婚的人之间的距离要比其与单身的人之间的距离更近,而这一点也值得商榷。如果使所有的类别型取值的距离相同,还要为此单独创建属性,例如已婚单身离婚,而每个属性都只有两个值:0()和1()。

数据集有66个属性和两个可能的标签:。我们还将数据划分成如下两个子集。

  • 训练数据集:有39 129个实例。
  • 测试数据集:有2059个实例。

正如第1章中所述,我们首先实现了该算法的串行版本。然后,寻找该算法中可以进行并行处理的部分,之后采用执行器框架执行并发任务。在下面几节中,我们将剖析k-最近邻算法的串行版本和两个不同的并发版本,其中第一个并发版本具有非常细的粒度,而第二个并发版本则具有较粗的粒度。

3.2.1 k-最近邻算法:串行版本

我们在KnnClassifier类中实现k-最近邻算法的串行版本。该类内存储了训练数据集和数值k(用于确定某个实例标签的范例数量)。

  1. public class KnnClassifier {
  2. private final List <? extends Sample>dataSet;
  3. private int k;
  4. public KnnClassifier(List <? extends Sample>dataSet, int k) {
  5. this.dataSet=dataSet;
  6. this.k=k;
  7. }

KnnClassifier类仅实现了一个名为classify的方法,该方法接收一个Sample对象作为参数,而该对象中含待分类的实例;classify方法返回一个字符串,其中含有要指派给该实例的标签。

  1. public String classify (Sample example) {

该方法包括三个主要的部分。首先,计算范例和训练集所有范例之间的距离。

  1. Distance[] distances=new Distance[dataSet.size()];
  2. int index=0;
  3. for (Sample localExample : dataSet) {
  4. distances[index]=new Distance();
  5. distances[index].setIndex(index);
  6. distances[index].setDistance(EuclideanDistanceCalculator
  7. .calculate(localExample, example));
  8. index++;
  9. }

其次,使用Arrays.sort()方法按照距离从低到高的顺序排列范例。

  1. Arrays.sort(distances);

最后,我们在k个最邻近的范例中统计实例最多的标签。

  1. Map<String, Integer> results = new HashMap<>();
  2. for (int i = 0; i < k; i++) {
  3. Sample localExample = dataSet.get(distances[i].getIndex());
  4. String tag = localExample.getTag();
  5. results.merge(tag, 1, (a, b) ->a+b);
  6. }
  7. return Collections.max(results.entrySet(),
  8. Map.Entry.comparingByValue()).getKey();
  9. }

为了计算两个范例之间的距离,可以使用以辅助类形式实现的欧氏距离。该类的代码如下:

  1. public class EuclideanDistanceCalculator {
  2. public static double calculate (Sample example1, Sample example2) {
  3. double ret=0.0d;
  4. double[] data1=example1.getExample();
  5. double[] data2=example2.getExample();
  6. if (data1.length!=data2.length) {
  7. throw new IllegalArgumentException ("Vector doesn't have
  8. the same length");
  9. }
  10. for (int i=0; i<data1.length; i++) {
  11. ret+=Math.pow(data1[i]-data2[i], 2);
  12. }
  13. return Math.sqrt(ret);
  14. }
  15. }

我们还可以用Distance类存放Sample输入和训练数据集中某一实例之间的距离。该类只有两个属性:训练集范例的索引和它到输入范例的距离。此外,该类还采用Arrays.sort()方法实现了Comparable接口。Sample类中存放了一个实例。它只有一个双精度型数组和一个含有该实例标签的字符串。

3.2.2 k-最近邻算法:细粒度并发版本

如果你分析一下k-最近邻算法的串行版本,就会发现在如下两处可以进行算法的并行处理。

  • 距离的计算:在每次循环迭代中都会计算输入范例和训练集某个范例之间的距离,而每次迭代均独立于其他各次迭代。
  • 距离的排序:Java 8在Array类中引入了parallelSort()方法,可以使用并行方式对数组进行排序。

在算法的第一个并行版本中,我们为待计算范例间的每个距离创建一个任务,也使距离数组的并发排序成为可能。我们在一个名为KnnClassifierParrallelIndividual的类中实现了这一版本的算法。该类中存放了训练数据集、参数k、执行并行任务的ThreadPoolExecutor对象、一个用于存放执行器中工作线程数的属性,以及一个用于指定是否要进行并行排序的属性。

我们将创建一个线程数固定的执行器,这样就可以控制该执行器将要使用的系统资源。这个数值可通过系统中可用处理器的数目(用Runtime类的availableProcessors()方法获得)乘以构造函数中参数factor的值得到。factor的值就是你从处理器获得的线程数。我们总是使用数值1,不过你也可以测试一下其他值并且对比结果。下面是分类算法的构造函数:

  1. public class KnnClassifierParallelIndividual {
  2. private final List<? extends Sample>dataSet;
  3. private final int k;
  4. private final ThreadPoolExecutor executor;
  5. private final int numThreads;
  6. private final boolean parallelSort;
  7. public KnnClassifierParallelIndividual(List<? extends Sample>dataSet,
  8. int k, int factor,
  9. booleanparallelSort) {
  10. this.dataSet=dataSet;
  11. this.k=k;
  12. numThreads=factor* (Runtime.getRuntime().availableProcessors());
  13. executor=(ThreadPoolExecutor)Executors
  14. .newFixedThreadPool(numThreads);
  15. this.parallelSort=parallelSort;
  16. }

要创建执行器,我们要使用Executors工具类及其newFixedThreadPool()方法。该方法接收的是你打算在执行器中使用的工作线程数。执行器的工作线程数绝不会超过你在该构造函数中指定的数目。该方法返回一个ExecutorService对象,但是我们将其强制类型转换为一个ThreadPoolExecutor对象,以便访问那些在ThreadPoolExecutor类中提供但是在ExecutorService接口中没有提供的方法。

该类还实现了classify()方法,它接收一个范例作为参数并且返回一个字符串。

首先,为每个需要计算的距离创建一个任务,并且将其发送给执行器。然后,主线程等待这些任务执行结束。为了控制该完成过程,我们使用了Java并发API提供的一种同步机制:CountDownLatch类。该类允许一个线程一直等待,直到其他线程到达其代码的某一确定点。该类需要使用等待线程数进行初始化,它实现了以下两种方法。

  • getDown():该方法用于减少要等待的线程数。
  • await():该方法挂起调用它的线程,直到计数器达到0为止。

在本例中,我们使用将在执行器中执行的任务数初始化CountDownLatch类。主线程为其调用await()方法,而每个任务完成其计算时调用getDown()方法:

  1. public String classify (Sample example) throws Exception {
  2. Distance[] distances=new Distance[dataSet.size()];
  3. CountDownLatchendController=new CountDownLatch(dataSet.size());
  4. int index=0;
  5. for (Sample localExample : dataSet) {
  6. IndividualDistanceTask task=new IndividualDistanceTask(distances,
  7. index, localExample, example, endController);
  8. executor.execute(task);
  9. index++;
  10. }
  11. endController.await();

然后,根据parallelSort属性的值,调用Arrays.sort()方法或者Arrays.parallelSort()方法。

  1. if (parallelSort) {
  2. Arrays.parallelSort(distances);
  3. } else {
  4. Arrays.sort(distances);
  5. }

最后,计算指派给输入范例的标签。这一部分代码与串行版本相同。

KnnClassifierParallelIndividual类还包含一个用于关闭执行器的方法,该方法调用了shutdown()方法。如果你不调用该方法,应用程序就不会结束,因为执行器所创建的线程仍然存在,并且在等待处理新任务。在此之前提交的任务已执行完毕,而新提交的任务会被拒绝。该方法并不会等待执行器完成,它会立即返回,如下所示。

  1. public void destroy() {
  2. executor.shutdown();
  3. }

本例的关键环节就是IndividualDistanceTask类。该类将输入范例与训练数据集中某个范例之间的距离作为一项并发任务计算。它存放了一个完整的距离数组(我们将只确立其中一个位置的值)、训练数据集中范例的索引、这两个范例和用于控制任务结束的CountDownLatch对象。IndividualDistanceTask类实现了Runnable接口,因此可以在执行器中执行。该类的构造函数如下:

  1. public class IndividualDistanceTask implements Runnable {
  2. private final Distance[] distances;
  3. private final int index;
  4. private final Sample localExample;
  5. private final Sample example;
  6. private final CountDownLatchendController;
  7. public IndividualDistanceTask(Distance[] distances, int index, Sample
  8. localExample,Sample example,
  9. CountDownLatchendController) {
  10. this.distances=distances;
  11. this.index=index;
  12. this.localExample=localExample;
  13. this.example=example;
  14. this.endController=endController;
  15. }

run()方法采用前面提到的EuclideanDistanceCalculator类计算了两个范例之间的距离,并且将结果存放在distances数组的对应位置中:

  1. @Override
  2. public void run() {
  3. distances[index] = new Distance();
  4. distances[index].setIndex(index);
  5. distances[index].setDistance(EuclideanDistanceCalculator
  6. .calculate(localExample, example));
  7. endController.countDown();
  8. }

3.2 第一个例子:k-最近邻算法 - 图1 请注意,尽管所有任务共享distances数组,但是我们并不需要采用任何同步机制,因为每个任务只会修改该数组的不同位置。

3.2.3 k-最近邻算法:粗粒度并发版本

上一节中给出的并发解决方案可能存在一定问题:执行的任务太多了。想想看,在这个例子中,我们有29 000多个训练范例,对于每个待分类范例需要启动29 000个任务。另一方面,我们已经创建的执行器最大工作线程数为numThreads。因此,另一个解决方案是仅启动numThreads个任务,并且将训练数据集划分为numThreads个组。比如,使用一个四核处理器执行这个例子,这样每个任务将要计算输入范例与大约7000个训练范例之间的距离。

我们已在KnnClassifierParallelGroup类中实现了该解决方案。它与KnnClassifierParallelIndividual类非常相似,但是存在两个主要区别。首先是classify()方法的初始化部分。现在,我们只有numThreads个任务,而且必须将训练数据集划分为numThreads个子集。

  1. public String classify(Sample example) throws Exception {
  2. Distance distances[] = new Distance[dataSet.size()];
  3. CountDownLatchendController = new CountDownLatch(numThreads);
  4. int length = dataSet.size() / numThreads;
  5. intstartIndex = 0, endIndex = length;
  6. for (int i = 0; i <numThreads; i++) {
  7. GroupDistanceTask task = new GroupDistanceTask(distances, startIndex,
  8. endIndex, dataSet, example, endController);
  9. startIndex = endIndex;
  10. if (i <numThreads - 2) {
  11. endIndex = endIndex + length;
  12. } else {
  13. endIndex = dataSet.size();
  14. }
  15. executor.execute(task);
  16. }
  17. endController.await();

计算每个任务的样本数量并存放在变量length中。然后,为每个线程指派待处理样本的开始索引和结束索引。除最后一个线程外,均时使用length值加上开始索引计算结束索引。对于最后一个线程而言,最后的索引值即为数据集的大小。

其次,该类使用GroupDistanceTask代替了IndividualDistanceTask。这两个类之间的主要区别在于前一个类处理的是训练数据集的一个子集,因此它存放的是整个训练数据集及其要处理的这部分数据集的起始位置和终止位置。

  1. public class GroupDistanceTask implements Runnable {
  2. private final Distance[] distances;
  3. private final intstartIndex, endIndex;
  4. private final Example example;
  5. private final List<? extends Example>dataSet;
  6. private final CountDownLatchendController;
  7. public GroupDistanceTask(Distance[] distances, intstartIndex,
  8. intendIndex, List<? extends Example>dataSet,
  9. Example example, CountDownLatchendController) {
  10. this.distances = distances;
  11. this.startIndex = startIndex;
  12. this.endIndex = endIndex;
  13. this.example = example;
  14. this.dataSet = dataSet;
  15. this.endController = endController;
  16. }

run()方法处理的是一个范例集合,而不仅仅是一个范例。

  1. public void run() {
  2. for (int index = startIndex; index <endIndex; index++) {
  3. Sample localExample=dataSet.get(index);
  4. distances[index] = new Distance();
  5. distances[index].setIndex(index);
  6. distances[index].setDistance(EuclideanDistanceCalculator
  7. .calculate(localExample, example));
  8. }
  9. endController.countDown();
  10. }

3.2.4 对比解决方案

对比一下已经实现的k-最近邻算法的不同版本。我们有如下五个不同版本。

  • 串行版本。
  • 采用串行排序的细粒度并发版本。
  • 采用并发排序的细粒度并发版本。
  • 采用串行排序的粗粒度并发版本。
  • 采用并发排序的粗粒度并发版本。

为了测试该算法,从Bank Marketing数据集中选取了2059个测试实例。分别在k取值为10、30和50的情况下采用上述五个版本的算法对上述所有实例进行分类,并且度量各个版本的执行时间。我们采用JMH框架执行上述各例,该框架支持用Java实现微型基准测试。使用一个面向基准测试的框架是一种比较好的解决方案,使用其中的currentTimeMillis()方法或者nanoTime()方法就可以测量时间。我们在两套架构上分别将其执行10次。

  • 一台计算机配置了Intel Core i5-5300 CPU、Windows 7操作系统和16GB的RAM。该处理器有两个核,每个核可以执行两个线程,这样我们就有了四个并行线程。
  • 另一台计算机配置了AMD A8-640 CPU、Windows 10操作系统和8GB的RAM。该处理器有四个核。

执行时间如下所示(单位:秒)。

算法kAMDIntel
串行10309.99126.26
30310.22125.65
50309.59126.48
细粒度串行排序10153.1989.97
30152.8590.61
50155.0189.97
细粒度并发排序10120.1076.81
30122.0076.69
50125.6173.33
粗粒度串行排序10138.2877.99
30137.5478.69
50137.8578.25
粗粒度并发排序10107.6266.48
30107.3665.93
50106.6166.22

我们可以得出以下结论。

  • 参数值k(10、30和50)的选择对算法的执行时间并无影响。对于这三个取值来说,五个版本在两套架构上都表现出相似的结果。
  • 正如我们所期望的那样,使用Arrays.parallelSort()方法进行并发排序,该算法的细粒度并发版和粗粒度并发版在性能上都会有显著提升。
  • 各并发版本都提升了应用程序的性能,但是采用串行或并行排序的粗粒度版本其性能实现了较大提升。

因此,如果与串行版本比较求取加速比,那么该算法的最好版本是采用并行排序的粗粒度解决方案。

S=\frac{T_{{\rm serial}}}{T_{{\rm concurrent}}}=\frac{99.218}{53.255}=1.86

这个例子说明,选择一个良好的并发解决方案可以带来巨大的性能提升,反之则相反。