6.4 模拟系统

并行化流操作的用武之地是使用简单操作处理大量数据,比如模拟系统。本节我们会搭建一个简易的模拟系统来理解摇骰子,但其中的原理对于大型、真实的系统也适用。

我们这里要讨论的是蒙特卡洛模拟法。蒙特卡洛模拟法会重复相同的模拟很多次,每次模拟都使用随机生成的种子。每次模拟的结果都被记录下来,汇总得到一个对系统的全面模拟。蒙特卡洛模拟法被大量用在工程、金融和科学计算领域。

如果公平地掷两次骰子,然后将赢的一面上的点数相加,就会得到一个2~12的数字。点数的和至少是2,因为骰子六个面上最小的点数是1,而我们将骰子掷了两次;点数的和最大超不过12,因为骰子点数最多的一面也不过6点。我们想要得出点数落在2~12之间每个值的概率。

解决该问题的方法之一是求出掷骰子的所有组合,比如,得到2点的方式是第一次掷得1点,第二次也掷得1点。总共有36种可能的组合,因此,掷得2点的概率就是1/36。

另外一种解法是使用1到6的随机数模拟掷骰子事件,然后用得到每个点数的次数除以总的投掷次数。这就是一个简单的蒙特卡洛模拟。模拟投掷骰子的次数越多,得到的结果越准确,因此,我们希望尽可能多地增加模拟次数。

例6-3展示了如何使用流实现蒙特卡洛模拟法。N代表模拟次数,在➊处使用IntStreamrange方法创建大小为N的流,在➋处调用parallel方法使用流的并行化操作,twoDiceThrows函数模拟了连续掷两次骰子事件,返回值是两次点数之和。在➌处使用mapToObj方法以便在流上使用该函数。

例6-3 使用蒙特卡洛模拟法并行化模拟掷骰子事件

  1. public Map<Integer, Double> parallelDiceRolls() {
  2. double fraction = 1.0 / N;
  3. return IntStream.range(0, N)
  4. .parallel()
  5. .mapToObj(twoDiceThrows())
  6. .collect(groupingBy(side -> side,
  7. summingDouble(n -> fraction)));
  8. }

在➍处得到了需要合并的所有结果的流,使用前一章介绍的groupingBy方法将点数一样的结果合并。我说过要计算每个点数的出现次数,然后除以总的模拟次数N。在流框架中,将数字映射为1/N并且相加很简单,这和前面说的计算方法是等价的。在➎处我们使用summingDouble方法完成了这一步。最终的返回值类型是Map,是点数之和到它们的概率的映射。

我得承认这段代码不算儿戏,但使用5行代码即能实现蒙特卡洛模拟法还是很精巧的。重要的是模拟的次数越多,得到的结果越准确,因此我们运行多次模拟的动机就会更加强烈。这是一个很好的并行化案列,并行化能带来速度的提升。

我已经带领读者浏览了整个实现细节,为了对比,例6-4给出了手动实现并行化蒙特卡洛模拟法的代码。可以看到,大多数代码都在处理调度和等待线程池中的某项任务完成。而使用并行化的流时,这些都不用程序员手动管理。

例6-4 通过手动使用线程模拟掷骰子事件

  1. public class ManualDiceRolls {
  2. private static final int N = 100000000;
  3. private final double fraction;
  4. private final Map<Integer, Double> results;
  5. private final int numberOfThreads;
  6. private final ExecutorService executor;
  7. private final int workPerThread;
  8. public static void main(String[] args) {
  9. ManualDiceRolls roles = new ManualDiceRolls();
  10. roles.simulateDiceRoles();
  11. }
  12. public ManualDiceRolls() {
  13. fraction = 1.0 / N;
  14. results = new ConcurrentHashMap<>();
  15. numberOfThreads = Runtime.getRuntime().availableProcessors();
  16. executor = Executors.newFixedThreadPool(numberOfThreads);
  17. workPerThread = N / numberOfThreads;
  18. }
  19. public void simulateDiceRoles() {
  20. List<Future<?>> futures = submitJobs();
  21. awaitCompletion(futures);
  22. printResults();
  23. }
  24. private void printResults() {
  25. results.entrySet()
  26. .forEach(System.out::println);
  27. }
  28. private List<Future<?>> submitJobs() {
  29. List<Future<?>> futures = new ArrayList<>();
  30. for (int i = 0; i < numberOfThreads; i++) {
  31. futures.add(executor.submit(makeJob()));
  32. }
  33. return futures;
  34. }
  35. private Runnable makeJob() {
  36. return () -> {
  37. ThreadLocalRandom random = ThreadLocalRandom.current();
  38. for (int i = 0; i < workPerThread; i++) {
  39. int entry = twoDiceThrows(random);
  40. accumulateResult(entry);
  41. }
  42. };
  43. }
  44. private void accumulateResult(int entry) {
  45. results.compute(entry, (key, previous) ->
  46. previous == null ? fraction
  47. : previous + fraction
  48. );
  49. }
  50. private int twoDiceThrows(ThreadLocalRandom random) {
  51. int firstThrow = random.nextInt(1, 7);
  52. int secondThrow = random.nextInt(1, 7);
  53. return firstThrow + secondThrow;
  54. }
  55. private void awaitCompletion(List<Future<?>> futures) {
  56. futures.forEach((future) -> {
  57. try {
  58. future.get();
  59. } catch (InterruptedException | ExecutionException e) {
  60. e.printStackTrace();
  61. }
  62. });
  63. executor.shutdown();
  64. }
  65. }