15.1 为支持并发而不断演进的Java

过去的20年,并发性的演进反映在计算机硬件、软件系统以及编程概念的变化上。为了支持不断演进的并发编程,Java也进行了大量的改进。总结这一演进可以帮助你更好地理解Java增加新特性的原因以及它们在程序和系统设计中所扮演的角色。

Java从一开始就提供了锁(通过synchronized类和方法)、Runnable以及线程。2004年,Java 5又引入了java.util.concurrent包,它能以更具表现力的方式支持并发,特别是ExecutorService1接口(将“任务提交”与“线程执行”解耦)、Callable以及Future,后两者使用泛型(也是从Java 5首次引入)生成一个高层封装的RunnableThread变体,可以返回执行结果。ExecutorService既可以执行Runnable也可以执行Callable。这些新特性促进了多核CPU上并行编程的发展。说句实话,没有人喜欢直接使用线程干活儿!

1ExecutorService接口继承了Executor接口,可以使用submit方法执行一个Callable;而Executor接口仅仅为Runnables提供了一个execute方法。

之后版本的Java依然持续地改进着对并发的支持,因为程序员们越来越需要更加高效地使用多核CPU的处理能力。正如第7章中介绍的,Java 7为了使用fork/join实现分而治之算法,新增了java.util.concurrent.RecursiveTask,Java 8则增加了对流和流的并行处理(依赖于新增的Lambda表达式)的支持。

通过支持组合式Future(基于Java 8 CompleteFuture实现的Future,详情请参考15.4节及第16章),Java进一步丰富了它的并发特性,Java 9提供了对分布式异步编程的显式支持。这些API为构建本章前面介绍的那种聚合型应用提供了思路和工具。在这种架构中,应用通过与各种网络服务通信,替用户实时整合需要的信息,或者将整合的信息作为进一步的网络服务提供出去。这种工作方式被称为反应式编程。Java 9通过“发布–订阅”协议(更具体地说,通过java.util.concurrent.Flow接口,详情请参考15.5节和第17章)增加了对它的支持。CompletableFuturejava.util.concurrent.Flow的关键理念是提供一种程序结构,让相互独立的任务尽可能地并发执行,通过这种方式最大化地利用多核或者多台机器提供的并发能力。

15.1.1 线程以及更高层的抽象

我们中的很多人都是从操纵系统这门课程中第一次了解线程和进程的。单CPU的计算机能支持多个用户,因为操作系统为每个用户创建了一个进程。操作系统为这些进程分配了相互独立的虚拟地址空间,这样每个用户都感觉他是在独占使用这台计算机。操作系统通过分时唤醒的方式让多个进程共享CPU资源,进一步地强化了这种假象。一个进程可以请求操作系统给它分配一个或多个线程——它们和主进程之间共享地址空间,因此可以并发地执行任务并相互协调。

在一个多核的环境中,单用户登录的笔记本电脑上可能只启动了一个用户进程,这种程序永远不能充分发挥计算机的处理能力,除非使用多线程。虽然每个核可以服务一个或多个进程或线程,但是如果你的程序并未使用多线程,那它同一时刻能有效使用的只有处理器众多核中的一个。

譬如你有个四核CPU的机器,如果安排合理,让每个CPU核都持续不停地执行有效的任务,理论上你程序的执行速度应该是单核CPU执行速度的四倍(当然,程序调度也会有开销,所以实际达不到这么多)。举个例子,假如你有一个容量为1 000 000个数字大小的数组,其中保存了学生给出的正确答案的数目。下面的程序运行在单个线程上(该程序在单核年代运行得很顺畅):

  1. long sum = 0;
  2. for (int i = 0; i < 1_000_000; i++) {
  3. sum += stats[i];
  4. }

将上面的程序与使用四个线程的版本进行比较,其中第一个线程执行:

  1. long sum0 = 0;
  2. for (int i = 0; i < 250_000; i++) {
  3. sum0 += stats[i];
  4. }

第四个线程执行:

  1. long sum3 = 0;
  2. for (int i = 750_000; i < 1_000_000; i++) {
  3. sum3 += stats[i];
  4. }

这四个线程在main程序中通过Java的.start()方法启动,使用.join()等待其执行完成,最后执行计算:

  1. sum = sum0 + ... + sum3;

问题是执行这种for循环既乏味又容易出错。另外,你该如何处理那些不在循环中的代码呢?

第7章展示了如何使用Java的Stream轻而易举地通过内部迭代而非外部迭代(显式的循环)实现并行:

  1. sum = Arrays.stream(stats).parallel().sum();

这里希望大家记得的是,对并行流的迭代是比显式使用线程更高级的概念。换句话说,使用流(Stream)是对一种线程使用模式的抽象。将这种抽象引入流就像使用一种设计模式,带来的好处是程序员不再需要编写枯燥的模板代码了,库中的实现隐藏了代码大部分的复杂性。第7章还介绍了如何使用自Java 7才支持的java.util.concurrent.RecursiveTask,它对线程的fork/join进行了抽象,可以并发地执行分而治之算法,用一种更高级的方式在多核机器上高效地执行数组求和计算。

学习更多的线程抽象方法之前,来复习一下ExecutorService(由Java 5引入),以及构建这些抽象的基础——线程池。

15.1.2 执行器和线程池

Java 5提供了执行器框架,其思想类似于一个高层的线程池,可以充分发挥线程的能力。执行器使得程序员有机会解耦任务的提交与任务的执行。

  • 线程的问题

Java线程直接访问操作系统的线程。这里主要的问题在于创建和删除操作系统线程的代价很大(涉及页表操作),并且一个系统中能创建的线程数目是有限的。如果创建的线程数超过操作系统的限制,很可能导致Java应用莫名其妙地崩溃,因此你需要特别留意,不要在线程运行时持续不断地创建新线程。

操作系统(以及Java)的线程数都远远大于硬件线程数2,因此即便一些操作系统线程被阻塞了,或者处于睡眠状态,所有的硬件线程还是会被完全占据,繁忙地执行着指令。举个例子,2016年英特尔公司生产的酷睿i7-6900K服务器处理器有八个核,每个核上有两个对称多处理(SMP)的硬件线程,这样算下来就有16个硬件线程。服务器上很可能有好多个这样的处理器,最终一台服务器上可能有64个硬件线程。与此相反,笔记本电脑可能就只有一个或者两个硬件线程,因此,移植程序时,不能想当然地假设可以使用多少个硬件线程。而某个程序中Java线程的最优数目往往依赖于硬件核的数目。

  • 线程池的优势

Java的ExecutorService提供了一个接口,用户可以提交任务并获取它们的执行结果。期望的实现是使用newFixedThreadPool这样的工厂方法创建一个线程池:

  1. ExecutorService newFixedThreadPool(int nThreads)

这个方法会创建一个包含nThreads(通常称为工作线程)的ExecutorService,新创建的线程会被放入一个线程池,每次有新任务请求时,以先来先到的策略从线程池中选取未被使用的线程执行提交的任务请求。任务执行完毕之后,这些线程又会被归还给线程池。这种方式的最大优势在于能以很低的成本向线程池提交上千个任务,同时保证硬件匹配的任务执行。此外,你还有一些选项可以对ExecutorService进行配置,譬如队列长度、拒绝策略以及不同任务的优先级等。

请注意这里使用的术语:程序员提供任务(它可以是一个Runnable或者Callable),由线程负责执行。

  • 线程池的不足

大多数情况下,使用线程池都比直接操纵线程要好,不过你也需要特别留意使用线程池的两个陷阱。

  • 使用k个线程的线程池只能并发地执行k个任务。提交的任务如果超过这个限制,线程池不会创建新线程去执行该任务,这些超限的任务会被加入等待队列,直到现有任务执行完毕才会重新调度空闲线程去执行新任务。通常情况下,这种工作模式运行得很好,它让你可以一次提交多个任务,而不必随机地创建大量的线程。然而,采用这种方式时你需要特别留意任务是否存在会进入睡眠、等待I/O结束或者等待网络连接的情况。一旦发生阻塞式I/O,这些任务占用了线程,却会由于等待无法执行有价值的工作。假设你的CPU有4个硬件线程,创建的线程池大小为5,你一次性提交了20个执行任务(如图15-3所示)。你希望这些任务会并发地执行,直到所有20个任务执行完毕。假设首批提交的线程中有3个线程进入了睡眠状态或者在等待I/O,那就只剩2个线程可以服务剩下的15个任务了。如此一来,你只能取得你之前预期吞吐量的一半(如果你创建的线程池中工作线程数为8,那么还是能取得同样预期吞吐量的)。如果早期提交的任务或者正在执行的任务需要等待后续任务,而这也正是Future典型的使用模式,那么可能会导致线程池死锁。
    15.1 为支持并发而不断演进的Java - 图3

图 15-3 睡眠线程会降低线程池的吞吐量

这里希望大家牢记的是,尽量避免向线程池提交可能阻塞(譬如睡眠,或者要等待某个事件)的任务,然而这一点在遗留系统中可能无法避免。

  • 通常情况下,Java从main返回之前,都会等待所有的线程执行完毕,从而避免误杀正在执行关键代码的线程。因此,实际操作时的一个好习惯是在退出程序执行之前,确保关闭每一个线程池(因为线程池中的工作线程在创建完后会由于要等待另一个任务执行完毕而无法正常终止)。实践中,我们经常使用一个长时间运行的ExecutorService管理需要持续运行的互联网服务。
    Java也提供了Thread.setDaemon方法来控制这种行为,下一节讨论这一内容。

2描述这一点时我们曾经使用过“核”,不过像英特尔酷睿i7-6900K这样的CPU每个核上又有多个硬件线程,因此CPU即使经历了短暂的延迟,譬如缓存未命中,还是能继续执行指令。

15.1.3 其他的线程抽象:非嵌套方法调用

为了解释为什么本章使用的并发形式与第7章(并行流处理以及fork/join框架)不同,我们不得不提到第7章的使用形式都有个特殊的属性:无论什么时候,任何任务(或者线程)在方法调用中启动时,都会在其返回之前调用同一个方法。换句话说,线程创建以及与其匹配的join()在调用返回的嵌套方法调用中都以嵌套的方式成对出现。这种思想被称为严格fork/join,如图15-4所示。

15.1 为支持并发而不断演进的Java - 图4

图 15-4 严格的fork/join。箭头代表线程,圆圈代表fork和join,方框代表方法调用和返回

以一种更加松散的形式组织fork/join其实也无伤大雅,这种方式下子任务从内部方法调用中逃逸出来,在外层调用中执行join,这样提供给用户的接口看起来还是一个普通调用3,如图15-5所示。

3对比“函数式的思考”(第18章),该章讨论了如何将内部使用有副作用的方法改造为无副作用的接口。

15.1 为支持并发而不断演进的Java - 图5

图 15-5 灵活的fork/join

本章着重讨论多种多样的并发形态,其中用户的方法调用创建的线程(或者派生的任务)可能比该调用方法的生命周期还长,如图15-6所示。

15.1 为支持并发而不断演进的Java - 图6

图 15-6 一种异步方法

这种类型的方法常常被称作异步方法,它的名字源于该方法所派生的任务会继续执行调用方法希望它完成的工作。15.2节会介绍Java 8和Java 9中受益于该方法的新特性。不过,先来看看采用这种方法会有哪些潜在的危害。

  • 子线程与执行方法调用的代码会并发执行,因此为了避免出现数据竞争,编写代码时需要特别小心。
  • 如果Java的main()方法在子线程终止之前返回,会发生什么情况?有两种可能性,然而它们都不是我们期望的。
    • 等待所有的线程都执行完毕,再退出主应用的执行。
    • 直接杀死所有无法正常终止的线程,然后退出程序的执行。

前一个方案可能由于等待一个一直无法顺利结束的线程,最终导致应用崩溃;后一个方案有可能中断一个写磁盘的I/O序列,导致外部数据出现不一致的现象。为了避免这些问题,你需要确保你的程序能有效地跟踪它创建的线程,且退出程序运行(包括线程池的关闭)之前必须加入这些线程。

依据有没有执行setDaemon()方法,Java线程可以被划分为守护进程以及非守护进程。守护进程的线程在退出时就被终止(因此特别适合作为服务,因为它不会导致磁盘数据不一致),而从主程序返回的线程还得继续等待,直到所有非守护线程都终止了,应用才能退出执行。

15.1.4 你希望线程为你带来什么

你希望采用线程技术梳理程序的结构,以便在需要的时候享受程序并行带来的好处,生成足够多的任务以充分利用所有硬件线程。这意味着你需要对程序进行切分,把它划分成很多小任务(不过也不能太小,因为任务切换也存在开销)。我们已经在第7章中学习了如何使用并行流处理和fork/join对for循环以及分而治之算法进行处理,本章接下来(包括第16章和第17章)会学习如何避免使用冗长的线程操作模板代码去处理方法调用。