6.4 模拟系统
并行化流操作的用武之地是使用简单操作处理大量数据,比如模拟系统。本节我们会搭建一个简易的模拟系统来理解摇骰子,但其中的原理对于大型、真实的系统也适用。
我们这里要讨论的是蒙特卡洛模拟法。蒙特卡洛模拟法会重复相同的模拟很多次,每次模拟都使用随机生成的种子。每次模拟的结果都被记录下来,汇总得到一个对系统的全面模拟。蒙特卡洛模拟法被大量用在工程、金融和科学计算领域。
如果公平地掷两次骰子,然后将赢的一面上的点数相加,就会得到一个2~12的数字。点数的和至少是2,因为骰子六个面上最小的点数是1,而我们将骰子掷了两次;点数的和最大超不过12,因为骰子点数最多的一面也不过6点。我们想要得出点数落在2~12之间每个值的概率。
解决该问题的方法之一是求出掷骰子的所有组合,比如,得到2点的方式是第一次掷得1点,第二次也掷得1点。总共有36种可能的组合,因此,掷得2点的概率就是1/36。
另外一种解法是使用1到6的随机数模拟掷骰子事件,然后用得到每个点数的次数除以总的投掷次数。这就是一个简单的蒙特卡洛模拟。模拟投掷骰子的次数越多,得到的结果越准确,因此,我们希望尽可能多地增加模拟次数。
例6-3展示了如何使用流实现蒙特卡洛模拟法。N代表模拟次数,在➊处使用IntStream的range方法创建大小为N的流,在➋处调用parallel方法使用流的并行化操作,twoDiceThrows函数模拟了连续掷两次骰子事件,返回值是两次点数之和。在➌处使用mapToObj方法以便在流上使用该函数。
例6-3 使用蒙特卡洛模拟法并行化模拟掷骰子事件
public Map<Integer, Double> parallelDiceRolls() {double fraction = 1.0 / N;return IntStream.range(0, N) ➊.parallel() ➋.mapToObj(twoDiceThrows()) ➌.collect(groupingBy(side -> side, ➍summingDouble(n -> fraction))); ➎}
在➍处得到了需要合并的所有结果的流,使用前一章介绍的groupingBy方法将点数一样的结果合并。我说过要计算每个点数的出现次数,然后除以总的模拟次数N。在流框架中,将数字映射为1/N并且相加很简单,这和前面说的计算方法是等价的。在➎处我们使用summingDouble方法完成了这一步。最终的返回值类型是Map,是点数之和到它们的概率的映射。
我得承认这段代码不算儿戏,但使用5行代码即能实现蒙特卡洛模拟法还是很精巧的。重要的是模拟的次数越多,得到的结果越准确,因此我们运行多次模拟的动机就会更加强烈。这是一个很好的并行化案列,并行化能带来速度的提升。
我已经带领读者浏览了整个实现细节,为了对比,例6-4给出了手动实现并行化蒙特卡洛模拟法的代码。可以看到,大多数代码都在处理调度和等待线程池中的某项任务完成。而使用并行化的流时,这些都不用程序员手动管理。
例6-4 通过手动使用线程模拟掷骰子事件
public class ManualDiceRolls {private static final int N = 100000000;private final double fraction;private final Map<Integer, Double> results;private final int numberOfThreads;private final ExecutorService executor;private final int workPerThread;public static void main(String[] args) {ManualDiceRolls roles = new ManualDiceRolls();roles.simulateDiceRoles();}public ManualDiceRolls() {fraction = 1.0 / N;results = new ConcurrentHashMap<>();numberOfThreads = Runtime.getRuntime().availableProcessors();executor = Executors.newFixedThreadPool(numberOfThreads);workPerThread = N / numberOfThreads;}public void simulateDiceRoles() {List<Future<?>> futures = submitJobs();awaitCompletion(futures);printResults();}private void printResults() {results.entrySet().forEach(System.out::println);}private List<Future<?>> submitJobs() {List<Future<?>> futures = new ArrayList<>();for (int i = 0; i < numberOfThreads; i++) {futures.add(executor.submit(makeJob()));}return futures;}private Runnable makeJob() {return () -> {ThreadLocalRandom random = ThreadLocalRandom.current();for (int i = 0; i < workPerThread; i++) {int entry = twoDiceThrows(random);accumulateResult(entry);}};}private void accumulateResult(int entry) {results.compute(entry, (key, previous) ->previous == null ? fraction: previous + fraction);}private int twoDiceThrows(ThreadLocalRandom random) {int firstThrow = random.nextInt(1, 7);int secondThrow = random.nextInt(1, 7);return firstThrow + secondThrow;}private void awaitCompletion(List<Future<?>> futures) {futures.forEach((future) -> {try {future.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});executor.shutdown();}}
