6.6 开发你自己的收集器以获得更好的性能
在6.4节讨论分区的时候,我们用Collectors类提供的一个方便的工厂方法创建了一个收集器,它将前
个自然数划分为质数和非质数,如下所示。
代码清单 6-6 将前
个自然数按质数和非质数分区
public Map<Boolean, List<Integer>> partitionPrimes(int n) {return IntStream.rangeClosed(2, n).boxed().collect(partitioningBy(candidate -> isPrime(candidate));}
当时,通过限制除数不超过被测试数的平方根,我们对最初的isPrime方法做了一些改进:
public boolean isPrime(int candidate) {int candidateRoot = (int) Math.sqrt((double) candidate);return IntStream.rangeClosed(2, candidateRoot).noneMatch(i -> candidate % i == 0);}
还有没有办法来获得更好的性能呢?答案是“有”,但为此你必须开发一个自定义收集器。
6.6.1 仅用质数做除数
一个可能的优化是仅看被测试数是不是能够被质数整除。要是除数本身都不是质数就用不着测了。所以我们可以仅用被测试数之前的质数来测试。然而我们目前所见的预定义收集器的问题,也就是必须自己开发一个收集器的原因在于,在收集过程中是没有办法访问部分结果的。这意味着,当测试某一个数字是否是质数的时候,你没法访问目前已经找到的其他质数的列表。
假设你有这个列表,那就可以把它传给isPrime方法,将方法重写如下:
public static boolean isPrime(List<Integer> primes, int candidate) {return primes.stream().noneMatch(i -> candidate % i == 0);}
而且还应该应用先前的优化,仅仅用小于被测数平方根的质数来测试。因此,你需要想办法在下一个质数大于被测数平方根时立即停止测试。可以使用Stream的takeWhile的方法:
public static boolean isPrime(List<Integer> primes, int candidate){int candidateRoot = (int) Math.sqrt((double) candidate);return primes.stream().takeWhile(i -> i <= candidateRoot).noneMatch(i -> candidate % i == 0);}
测验6.3:用Java 8模拟
takeWhileJava 9引入了
takeWhile方法,如果你用的还是Java 8,那么非常不幸,你无法使用这种解决方案。怎样避免这种局限,用Java 8提供的功能实现类似的效果呢?答案:你可以实现自己的
takeWhile方法,它接受一个排序列表和一个谓词,返回列表元素中符合该谓词条件的最长子列表,代码如下所示:
public static <A> List<A> takeWhile(List<A> list, Predicate<A> p) {int i = 0;for (A item : list) {if (!p.test(item)) { ←—— 检查列表中的当前元素是否符合谓词的约束return list.subList(0, i); ←—— 如果当前元素不符合谓词要求,返回测试元素的前序子列表}i++;}return list; ←—— 列表中的所有元素都符合该谓词时,返回该列表}采用这种方式,你可以重写
isPrime方法,只对那些不大于其平方根的候选素数进行测试:
public static boolean isPrime(List<Integer> primes, int candidate){int candidateRoot = (int) Math.sqrt((double) candidate);return takeWhile(primes, i -> i <= candidateRoot).stream().noneMatch(p -> candidate % p == 0);}注意,与Stream API提供的版本不同,采用这种方式实现的版本是即时的。理想情况下,我们更希望采用Java 9那种由Stream提供的
takeWhile,它具有延迟求值的特性,还能结合noneMatch来操作。
有了这个新的isPrime方法在手,你就可以实现自己的自定义收集器了。首先你需要声明一个实现Collector接口的新类,接着要实现Collector接口所需的五个方法。
- 第1步:定义
Collector类的签名
让我们从类签名开始吧,记得Collector接口的定义是:
public interface Collector<T, A, R>
其中T、A和R分别是流中元素的类型、用于累积部分结果的对象类型,以及collect操作最终结果的类型。这里应该收集Integer流,而累加器和结果类型则都是Map(和先前代码清单6-6中分区操作得到的结果Map相同),键是true和false,值则分别是质数和非质数的List:
public class PrimeNumbersCollectorimplements Collector<Integer, ←---- 流中元素的类型Map<Boolean, List<Integer>>, ←---- 累加器类型Map<Boolean, List<Integer>>> ←---- collect操作的结果类型
- 第2步:实现归约过程
接下来,你需要实现Collector接口中声明的五个方法。supplier方法会返回一个在调用时创建累加器的函数:
public Supplier<Map<Boolean, List<Integer>>> supplier() {return () -> new HashMap<Boolean, List<Integer>>() {{put(true, new ArrayList<Integer>());put(false, new ArrayList<Integer>());}};}
这里不但创建了用作累加器的Map,还为true和false两个键初始化了对应的空列表。在收集过程中会把质数和非质数分别添加到这里。收集器中最重要的方法是accumulator,因为它定义了如何收集流中元素的逻辑。这里它也是实现前面所讲的优化的关键。现在在任何一次迭代中,都可以访问收集过程的部分结果,也就是包含迄今找到的质数的累加器:
public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {return (Map<Boolean, List<Integer>> acc, Integer candidate) -> {acc.get( isPrime(acc.get(true), candidate) ) ←---- 根据isPrime的结果,获取质数或非质数列表.add(candidate); ←---- 将被测数添加到相应的列表中};}
在这个方法中,你调用了isPrime方法,将待测试是否为质数的数以及迄今找到的质数列表(也就是累积Map中true键对应的值)传递给它。这次调用的结果随后被用作获取质数或非质数列表的键,这样就可以把新的被测数添加到恰当的列表中。
- 第3步:让收集器并行工作(如果可能)
下一个方法要在并行收集时把两个部分累加器合并起来,这里,它只需要合并两个Map,即将第二个Map中质数和非质数列表中的所有数字合并到第一个Map的对应列表中就行了:
public BinaryOperator<Map<Boolean, List<Integer>>> combiner() {return (Map<Boolean, List<Integer>> map1,Map<Boolean, List<Integer>> map2) -> {map1.get(true).addAll(map2.get(true));map1.get(false).addAll(map2.get(false));return map1;};}
请注意,实际上这个收集器是不能并行使用的,因为该算法本身是顺序的。这意味着永远都不会调用combiner方法,你可以把它的实现留空(更好的做法是抛出一个UnsupportedOperationException异常)。为了让这个例子完整,我们还是决定实现它。
- 第4步:
finisher方法和收集器的characteristics方法
最后两个方法的实现都很简单。前面说过,accumulator正好就是收集器的结果,用不着进一步转换,那么finisher方法就返回identity函数:
public Function<Map<Boolean, List<Integer>>,Map<Boolean, List<Integer>>> finisher() {return Function.identity();}
就characteristics方法而言,我们已经说过,它既不是CONCURRENT也不是UNORDERED,却是IDENTITY_FINISH的:
public Set<Characteristics> characteristics() {return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));}
下面列出了最后实现的PrimeNumbersCollector。
代码清单 6-7
PrimeNumbersCollector
public class PrimeNumbersCollectorimplements Collector<Integer,Map<Boolean, List<Integer>>,Map<Boolean, List<Integer>>> {@Overridepublic Supplier<Map<Boolean, List<Integer>>> supplier() {return () -> new HashMap<Boolean, List<Integer>>() {{ ←---- 从一个有两个空List的Map开始收集过程put(true, new ArrayList<Integer>());put(false, new ArrayList<Integer>());}};}@Overridepublic BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {return (Map<Boolean, List<Integer>> acc, Integer candidate) -> {acc.get( isPrime( acc.get(true), ←---- 将已经找到的质数列表传递给isPrime方法candidate) ).add(candidate); ←---- 根据isPrime方法的返回值,从Map中取质数或非质数列表,把当前的被测数加进去};}@Overridepublic BinaryOperator<Map<Boolean, List<Integer>>> combiner() {return (Map<Boolean, List<Integer>> map1,Map<Boolean, List<Integer>> map2) -> { ←---- 将第二个Map合并到第一个map1.get(true).addAll(map2.get(true));map1.get(false).addAll(map2.get(false));return map1;};}@Overridepublic Function<Map<Boolean, List<Integer>>,Map<Boolean, List<Integer>>> finisher() {return Function.identity(); ←---- 收集过程最后无须转换,因此用identity函数收尾}@Overridepublic Set<Characteristics> characteristics() {return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH)); ←---- 这个收集器是IDENTITY_FINISH,但既不是UNORDERED也不是CONCURRENT,因为质数是按顺序发现的}}
现在你可以用这个新的自定义收集器来代替6.4节中用partitioningBy工厂方法创建的那个,并获得完全相同的结果了:
public Map<Boolean, List<Integer>>partitionPrimesWithCustomCollector(int n) {return IntStream.rangeClosed(2, n).boxed().collect(new PrimeNumbersCollector());}
6.6.2 比较收集器的性能
用partitioningBy工厂方法创建的收集器和你刚刚开发的自定义收集器在功能上是一样的,但是有没有实现用自定义收集器超越partitioningBy收集器性能的目标呢?现在让我们写个测试框架来跑一下吧:
public class CollectorHarness {public static void main(String[] args) {long fastest = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long start = System.nanoTime(); ←---- 运行测试10次partitionPrimes(1_000_000); ←---- 将前一百万个自然数按质数和非质数分区long duration = (System.nanoTime() - start) / 1_000_000; ←---- 取运行时间的毫秒值if (duration < fastest) fastest = duration; ←---- 检查这个执行是否是最快的一个}System.out.println("Fastest execution done in " + fastest + " msecs");}}
请注意,更为科学的测试方法是用一个诸如JMH的框架,但我们不想在这里把问题搞得更复杂。对这个例子而言,这个小小的测试类提供的结果足够准确了。这个类会先把前一百万个自然数分为质数和非质数,利用partitioningBy工厂方法创建的收集器调用方法10次,记下最快的一次运行。在英特尔i5 2.4 GHz的机器上运行得到了以下结果:
Fastest execution done in 4716 msecs
现在把测试框架的partitionPrimes换成partitionPrimesWithCustomCollector,以便测试我们开发的自定义收集器的性能。现在,程序打印:
Fastest execution done in 3201 msecs
还不错!这意味着开发自定义收集器并不是白费工夫,原因有二:第一,你学会了如何在需要的时候实现自己的收集器;第二,你获得了大约32%的性能提升。
最后还有一点很重要,就像代码清单6-5中的ToListCollector那样,也可以通过把实现PrimeNumbersCollector核心逻辑的三个函数传给collect方法的重载版本来获得同样的结果:
public Map<Boolean, List<Integer>> partitionPrimesWithCustomCollector(int n) {IntStream.rangeClosed(2, n).boxed().collect(() -> new HashMap<Boolean, List<Integer>>() {{ ←---- 供应源put(true, new ArrayList<Integer>());put(false, new ArrayList<Integer>());}},(acc, candidate) -> { ←---- 累加器acc.get( isPrime(acc.get(true), candidate) ).add(candidate);},(map1, map2) -> { ←---- 组合器map1.get(true).addAll(map2.get(true));map1.get(false).addAll(map2.get(false));});}
你看,这样就可以避免为实现Collector接口创建一个全新的类;得到的代码更紧凑,虽然可能可读性会差一点,可重用性会差一点。
