3.3 第二个例子:客户端/服务器环境下的并发处理

客户端/服务器模型是一种软件架构,基于这种模型的应用程序被划分为两个部分:提供资源(数据、操作、打印机、存储等)的服务器端和使用服务器端所提供的资源的客户端。传统上,这种架构用于企业领域,但是随着互联网的蓬勃发展,至今它仍然是一个很现实的主题。你也可以将Web应用程序视为客户端/服务器应用程序,其服务器部分就是在Web服务器中执行的应用程序后台部分,而Web浏览器则执行应用程序客户端部分。SOA(service-oriented architecture,面向服务的架构)是客户端/服务器架构的另一个例子,其中公开的Web服务即为其服务器部分,而使用这些服务的各种客户端则是客户端部分。

在客户端/服务器环境中,我们通常都有一台服务器和很多使用该服务器所提供服务的客户端,因此设计这样的系统时,服务器的性能方面非常重要。

在本节,我们将实现一个简单的客户端/服务器应用程序。它将对某银行发布的发展指数进行数据搜索。

该服务器主要有以下特点。

  • 客户端与服务器都使用套接字连接。
  • 客户端将以字符串形式发送查询,而服务器将用另一个字符串返回结果。
  • 服务器可以响应三种不同查询。
    • Query:这种查询的格式是q;codCountry;codIndicator;year,其中codCountry是国家代码,codIndicator是指数代码,而year是一个可选参数,表示你想要查询的年份。服务器的响应信息将以单个字符串的形式返回。
    • Report:这种查询的格式是r;codIndicator,其中codIndicator是你要制表的指数代码。服务器将以单个字符串形式响应各年份所有国家该指数的平均值。
    • Stop:这种查询的格式是z;接收到该命令时,服务器将停止执行。
  • 在其他情况下,服务器将返回一个错误消息。

与前面的例子相同,下文将展示如何实现该客户端/服务器应用程序的串行版本。然后,将展示如何使用执行器实现其并发版本。最后,我们将比较这两种解决方案,以审视在这种情况下使用并发处理的优点。

3.3.1 客户端/服务器:串行版

服务器应用程序的串行版本主要有三个部件。

  • DAO(data access object,数据访问对象)部件,负责访问数据并且获取查询结果。
  • 命令部件,由各种查询的命令组成。
  • 服务器部件,接收查询,调用对应命令,并且向客户端返回结果。

下面仔细了解一下上述部件。

  • DAO部件

正如我们前面提到的,服务器将针对发展指数进行数据搜索。该数据以CSV文件存放。该应用程序中的DAO组件将整个文件加载到内存的一个List对象中。它为涉及的每个查询都实现一个方法,而这些方法通过搜索该列表查找数据。

在此我们不介绍这个类的代码,因为很容易实现,而且它也不是本书所要讲述的重点内容。

  • 命令部件

命令部件是DAO部件和服务器部件之间的中介。我们实现了一个基本抽象类Command,它是所有命令的基类。

  1. public abstract class Command {
  2. protected final String[] command;
  3. public Command (String [] command) {
  4. this.command=command;
  5. }
  6. public abstract String execute ();
  7. }

然后,我们为每一个查询实现了一条命令。查询在QueryCommand类中实现,其execute()方法如下所示:

  1. public String execute() {
  2. WDIDAOdao=WDIDAO.getDAO();
  3. if (command.length==3) {
  4. return dao.query(command[1], command[2]);
  5. } else if (command.length==4) {
  6. try {
  7. return dao.query(command[1], command[2],
  8. Short.parseShort(command[3]));
  9. } catch (Exception e) {
  10. return "ERROR;Bad Command";
  11. }
  12. } else {
  13. return "ERROR;Bad Command";
  14. }
  15. }

Report查询在ReportCommand中实现,其execute()方法如下所示:

  1. @Override
  2. public String execute() {
  3. WDIDAOdao=WDIDAO.getDAO();
  4. return dao.report(command[1]);
  5. }

Stop查询在StopCommand类中实现,其execute()方法如下所示:

  1. @Override
  2. public String execute() {
  3. return "Server stopped";
  4. }

最后,出错的情况通过ErrorCommand类处理,其execute()方法如下所示:

  1. @Override
  2. public String execute() {
  3. return "Unknown command: "+command[0];
  4. }
  • 服务器部件

最后,服务器部件在SerialServer类中实现。首先,它通过调用getDAO()方法初始化DAO。其主要目的是使用DAO加载所有数据。

  1. public class SerialServer {
  2. public static void main(String[] args) throws IOException {
  3. WDIDAOdao = WDIDAO.getDAO();
  4. booleanstopServer = false;
  5. System.out.println("Initialization completed.");
  6. try (ServerSocketserverSocket = new ServerSocket(Constants
  7. .SERIAL_PORT)) {

此后,我们需要执行一个循环,直到该服务器接收到一个Stop查询为止。该循环执行以下四步。

  • 接收来自客户端的查询。
  • 解析并分割该查询的要素。
  • 调用对应的命令。
  • 向客户端返回结果。
    这四个步骤的实现如下述代码片段所示:
  1. do {
  2. try (Socket clientSocket = serverSocket.accept();
  3. PrintWriter out = new PrintWriter(clientSocket.getOutputStream(),
  4. true);
  5. BufferedReader in = new BufferedReader(new InputStreamReader
  6. (clientSocket.getInputStream()));) {
  7. String line = in.readLine();
  8. Command command;
  9. String[] commandData = line.split(";");
  10. System.out.println("Command: " + commandData[0]);
  11. switch (commandData[0]) {
  12. case "q":
  13. System.out.println("Query");
  14. command = new QueryCommand(commandData);
  15. break;
  16. case "r":
  17. System.out.println("Report");
  18. command = new ReportCommand(commandData);
  19. break;
  20. case "z":
  21. System.out.println("Stop");
  22. command = new StopCommand(commandData);
  23. stopServer = true;
  24. break;
  25. default:
  26. System.out.println("Error");
  27. command = new ErrorCommand(commandData);
  28. }
  29. String response = command.execute();
  30. System.out.println(response);
  31. } catch (IOException e) {
  32. e.printStackTrace();
  33. }
  34. } while (!stopServer);

3.3.2 客户端/服务器:并行版本

在串行版本中,服务器部件存在一个非常严重的缺陷。当处理一个查询时并不能兼顾其他查询。如果响应每个查询请求或特定请求需要耗费大量时间,那么服务器的性能就会很低。

我们可以使用并发处理获得更好的性能。如果服务器收到请求后创建了一个线程,它可以将该查询的所有处理委托该线程,并开始处理新的请求。这种方法也存在一些问题。如果我们接收到了大量查询,则会导致系统因创建太多线程而不堪重负。但是如果我们使用线程数固定的执行器,就可以控制服务器所使用的资源,并获得比串行版本更好的性能。

为了使用执行器将串行版服务器部件转换为并发版,必须修改服务器端。DAO部件是相同的,虽然我们也已经更改了实现命令部件的类名,但是实现过程几乎相同。只有Stop查询发生了改变,因为现在它有了更多职能。下面我们了解一下并发版服务器部件的实现细节。

  • 服务器部件

并发服务器部件在ConcurrentServer部件中实现。我们为其加入了两项串行服务器不具备的要素:在ParallelCache类中实现的缓存系统和在Logger类中实现的日志系统。首先,并发服务器调用getDAO()方法初始化DAO部分。主要目标是DAO加载所有数据并且使用Executors类的newFixedThreadPool()方法创建一个ThreadPoolExecutor对象。该方法接收的是我们要在服务器中使用的最大工作线程数。执行器绝不会超过该工作线程数。要获得工作线程数,我们要使用Runtime类的availableProcessors()方法获取系统的核数。

  1. public class ConcurrentServer {
  2. private static ThreadPoolExecutor executor;
  3. private static ParallelCache cache;
  4. private static ServerSocketserverSocket;
  5. private static volatileboolean stopped=false;
  6. public static void main(String[] args) {
  7. serverSocket=null;
  8. WDIDAOdao=WDIDAO.getDAO();
  9. executor=(ThreadPoolExecutor) Executors.newFixedThreadPool
  10. (Runtime.getRuntime().availableProcessors());
  11. cache=new ParallelCache();
  12. Logger.initializeLog();
  13. System.out.println("Initialization completed.");

布尔变量stopped被声明为volatile型,因为另一个线程可以更改它。当stopped变量被另一个线程设置为true时,volatile关键字确保了这一改变在主方法中可见。如果没有volatile关键字,由于CPU缓存或者编译优化方面的原因,这样的改变则不可见。然后,我们初始化ServerSocket以监听请求:

  1. serverSocket = new ServerSocket(Constants.CONCURRENT_PORT);

我们不能使用一个try-with-resources语句管理服务器socket。当我们收到Stop命令后需要关闭服务器,但是服务器正在等待serverSocket对象的accept()方法。为了迫使服务器丢弃该方法,需要显式关闭服务器(我们将在shutdown()方法中执行这一操作),因此不能使用try-with-resources语句关闭socket。

此后,我们会执行一个循环,直到服务器接收到一个Stop查询为止。该循环执行如下三个步骤。

  • 从客户端接收一个查询。
  • 创建一个任务处理该查询。
  • 将该任务发送给执行器。
    下面的代码片段展示了这三个步骤:
  1. do {
  2. try {
  3. Socket clientSocket = serverSocket.accept();
  4. RequestTask task = new RequestTask(clientSocket);
  5. executor.execute(task);
  6. } catch (IOException e) {
  7. e.printStackTrace();
  8. }
  9. } while (!stopped);

最后,一旦服务器完成了执行(离开了该循环),则必须使用awaitTermination()方法结束该执行器。该执行器完成execution()方法前,该方法将一直阻塞主线程。然后,关闭缓存系统,等待一条表明服务器执行完毕的消息,如下所示:

  1. executor.awaitTermination(1, TimeUnit.DAYS);
  2. System.out.println("Shutting down cache");
  3. cache.shutdown();
  4. System.out.println("Cache ok");
  5. System.out.println("Main server thread ended");

我们已经额外加入了两种方法:一种是getExecutor()方法,它返回了用于执行并发任务的ThreadPoolExecutor对象;另一种是shutdown()方法,它用于按照顺序结束服务器的执行器,该方法调用了执行器的shutdown()方法并且关闭ServerSocket

  1. public static void shutdown() {
  2. stopped = true;
  3. System.out.println("Shutting down the server...");
  4. System.out.println("Shutting down executor");
  5. executor.shutdown();
  6. System.out.println("Executor ok");
  7. System.out.println("Closing socket");
  8. try {
  9. serverSocket.close();
  10. System.out.println("Socket ok");
  11. } catch (IOException e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println("Shutting down logger");
  15. Logger.sendMessage("Shutting down the logger");
  16. Logger.shutdown();
  17. System.out.println("Logger ok");
  18. }

在并发服务器中有一个关键部件:处理每个客户端请求的RequestTask类。该类实现了Runnable接口,这样它就可以以并发方式在执行器中执行。其构造函数将接收用于与客户端通信的Socket参数。

  1. public class RequestTask implements Runnable {
  2. private final Socket clientSocket;
  3. public RequestTask(Socket clientSocket) {
  4. this.clientSocket = clientSocket;
  5. }

响应每个请求时,run()方法所完成的操作与串行服务器所做的操作相同。

  • 接收客户端查询。
  • 解析并分割该查询的要素。
  • 调用相应的命令。
  • 向客户端返回结果。
    其代码片段如下所示:
  1. public void run() {
  2. try (PrintWriter out = new PrintWriter(clientSocket
  3. .getOutputStream(), true);
  4. BufferedReader in = new BufferedReader(new InputStreamReader
  5. (clientSocket.getInputStream()));) {
  6. String line = in.readLine();
  7. Logger.sendMessage(line);
  8. ParallelCache cache = ConcurrentServer.getCache();
  9. String ret = cache.get(line);
  10. if (ret == null) {
  11. Command command;
  12. String[] commandData = line.split(";");
  13. System.out.println("Command: " + commandData[0]);
  14. switch (commandData[0]) {
  15. case "q":
  16. System.err.println("Query");
  17. command = new ConcurrentQueryCommand(commandData);
  18. break;
  19. case "r":
  20. System.err.println("Report");
  21. command = new ConcurrentReportCommand(commandData);
  22. break;
  23. case "s":
  24. System.err.println("Status");
  25. command = new ConcurrentStatusCommand(commandData);
  26. break;
  27. case "z":
  28. System.err.println("Stop");
  29. command = new ConcurrentStopCommand(commandData);
  30. break;
  31. default:
  32. System.err.println("Error");
  33. command = new ConcurrentErrorCommand(commandData);
  34. break;
  35. }
  36. ret = command.execute();
  37. if (command.isCacheable()) {
  38. cache.put(line, ret);
  39. }
  40. } else {
  41. Logger.sendMessage("Command "+line+" was found in the cache");
  42. }
  43. System.out.println(ret);
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. } finally {
  47. try {
  48. clientSocket.close();
  49. } catch (IOException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. }
  • 命令部件

正如前面的代码片段所示,我们重命名了命令部件中的所有类。除了ConcurrentStopCommand类之外,其他实现过程都相同。现在,命令部件调用ConcurrentServer类的shutdown()方法按照顺序结束服务器的执行。execute()方法的源代码如下:

  1. @Override
  2. public String execute() {
  3. ConcurrentServer.shutdown();
  4. return "Server stopped";
  5. }

同样,现在Command类包含了一个新的Boolean型的isCacheable()方法,如果缓存中存放了命令的结果,则该方法返回true,否则返回false

3.3.3 额外的并发服务器组件

我们已经实现了一些额外的并发服务器组件:返回服务器状态信息的新命令,存储命令执行结果以便在重复请求时节省时间的缓存系统,以及记录错误信息和调试信息的日志系统。接下来将介绍这些组件。

  • 状态命令

首先,我们有了一种新的查询。它有自己的格式,并且通过ConcurrentStatusCommand类处理。该类可获取服务器所使用的ThreadPoolExecutor对象,并且获取该执行器的相关状态信息:

  1. public class ConcurrentStatusCommand extends Command {
  2. public ConcurrentStatusCommand (String[] command) {
  3. super(command);
  4. setCacheable(false);
  5. }
  6. @Override
  7. public String execute() {
  8. StringBuildersb=new StringBuilder();
  9. ThreadPoolExecutor executor=ConcurrentServer.getExecutor();
  10. Logger.sendMessage(sb.toString());
  11. return sb.toString();
  12. }
  13. }

可以从该服务器获取的信息如下。

  • getActiveCount():该方法返回执行并发任务的大致任务数。线程池中可能有更多线程,但是它们都是空闲的。
  • getMaximumPoolSize():该方法返回了执行器可拥有的工作线程的最大数目。
  • getCorePoolSize():该方法返回了执行器拥有的核心工作线程数目。这个数字决定了线程池中线程数的最小值。
  • getPoolSize():该方法返回了当前线程池中的线程数。
  • getLargestPoolSize():该方法返回了线程池在执行期间的最大线程数。
  • getCompletedTaskCount():该方法返回了执行器已经执行的任务数。
  • getTaskCount():该方法返回了已预定执行任务的大致数目。
  • getQueue().size():该方法返回了在任务队列中等待的任务数。
    因为使用Executor类的newFixedThreadPool()方法创建了执行器,那么它的最大工作线程数和核心工作线程数相同。
  • 缓存系统

并行服务器中带有一个缓存系统,其作用是避免重复搜索那些近期已经进行过的数据。该缓存系统有如下三个要素。

  • CacheItem类:该类用于描述在缓存中存放的每个元素,而且它有如下四个属性。
    • 在缓存中存储的命令。我们将QueryReport命令存放在缓存之中。
    • 该命令所产生的响应。
    • 缓存中某一项的创建日期。
    • 该项在缓存中的最后访问时间。
  • CleanCacheTask类:如果在缓存中存储所有命令并从未删除,那么缓存的大小就会无限制增加。为了避免这种情况,我们还可以创建一个任务删除缓存中的元素,并将该任务作为一个Thread对象实现。有如下两种供选方案。
    • 你可以为缓存设定最大规模。如果缓存中的元素数大于最大值,就可以将那些近期很少访问的元素删除。
    • 你可以删除缓存中那些在某个预定时段内未被访问的元素。我们将要采用的就是这种方式。
  • ParallelCache类:该类实现了在缓存中存储和检索各元素的操作。为了在缓存中存储数据,我们采用了一种ConcurrentHashMap数据结构。因为缓存由服务器所有任务共享,我们必须采用一种同步机制保护对缓存的访问,以避免数据竞争条件。有如下三种供选方案。
    • 我们可以使用一种non-synchronized型的数据结构(例如HashMap)并且加入必要代码同步对该数据结构的各种访问,例如,采用锁。你也可以使用Collections类的synchronizedMap()方法将一个HashMap转换为一个synchronized型结构。
    • 使用synchronized型的数据结构,例如Hashtable。对于这种情况,我们不会形成数据竞争条件,但是性能会更好。
    • 使用并发数据结构,例如ConcurrentHashMap类,该类消除了出现数据竞争条件的可能性,而且该类被优化用于高并发环境中。我们将使用ConcurrentHashMap类的对象实现这种方案。
      CleanCacheTask类的代码如下:
  1. public class CleanCacheTask implements Runnable {
  2. private final ParallelCache cache;
  3. public CleanCacheTask(ParallelCache cache) {
  4. this.cache = cache;
  5. }
  6. @Override
  7. public void run() {
  8. try {
  9. while (!Thread.currentThread().interrupted()) {
  10. TimeUnit.SECONDS.sleep(10);
  11. cache.cleanCache();
  12. }
  13. } catch (InterruptedException e) {
  14. }
  15. }
  16. }

该类中有一个ParallelCache对象,并且每10秒钟,就会执行ParallelCache实例的cleanCache()方法。ParallelCache类有五种不同的方法。首先,该类的构造函数初始化了该缓存的元素。它创建了ConcurrentHashMap对象并且启动了一个执行CleanCacheTask类的线程:

  1. public class ParallelCache {
  2. private final ConcurrentHashMap<String, CacheItem> cache;
  3. private final CleanCacheTask task;
  4. private final Thread thread;
  5. public static intMAX_LIVING_TIME_MILLIS = 600_000;
  6. public ParallelCache() {
  7. cache=new ConcurrentHashMap<>();
  8. task=new CleanCacheTask(this);
  9. thread=new Thread(task);
  10. thread.start();
  11. }

然后,该类中还有两个方法可用于存储和检索缓存中的元素。我们使用put()方法将元素插入HashMap,并使用get()方法在HashMap中检索元素:

  1. public void put(String command, String response) {
  2. CacheItem item = new CacheItem(command, response);
  3. cache.put(command, item);
  4. }
  5. public String get (String command) {
  6. CacheItem item=cache.get(command);
  7. if (item==null) {
  8. return null;
  9. }
  10. item.setAccessDate(new Date());
  11. return item.getResponse();
  12. }

然后,CleanCacheTask类清理缓存的方法如下:

  1. public void cleanCache() {
  2. Date revisionDate = new Date();
  3. Iterator<CacheItem> iterator = cache.values().iterator();
  4. while (iterator.hasNext()) {
  5. CacheItem item = iterator.next();
  6. if (revisionDate.getTime() - item.getAccessDate().getTime()
  7. >MAX_LIVING_TIME_MILLIS) {
  8. iterator.remove();
  9. }
  10. }
  11. }

最后,还有一个用于关闭缓存的方法,该方法可中断执行CleanCacheTask类的线程,并且返回缓存中存储的元素数。

  1. public void shutdown() {
  2. thread.interrupt();
  3. }
  4. public intgetItemCount() {
  5. return cache.size();
  6. }
  • 日志系统

在本章所有例子中,我们都使用System.out.println()方法将信息反馈到控制台。当你实现的是一个准备在生产环境中执行的企业应用程序时,最好的方法就是使用日志系统记录调试信息和错误信息。log4j是Java中最受欢迎的日志系统。在本例中,我们将实现自己的日志系统,该系统采用了生产者/消费者并发设计模式。使用日志系统的任务将作为生产者,而把日志信息写入文件的特别任务(作为一个线程执行)将作为消费者。该日志系统的组件如下。

  • LogTask:该类实现了日志消费者,它可每10秒钟读取队列中存储的日志消息并将其写入文件。该类通过一个Thread对象来执行。
  • Logger:这是日志系统的主类。它有一个队列,生产者将存入信息,而消费者将读取这些信息。它还提供了一个可将消息加入队列的方法,以及一个获取队列存储的所有消息并将其写入磁盘的方法。
    实现该队列,与缓存系统相同,我们需要采用一种并发数据结构,以避免任何数据不一致的错误。我们有如下两个供选方案。

  • 使用阻塞型数据结构。当队列为满(在我们的例子中,队列永不会满)或者为空时,将会阻塞线程。

  • 使用非阻塞型数据结构。如果队列为满或者为空时,将会返回一个特定值。
    我们选择了一种非阻塞型数据结构,即ConcurrentLinkedQueue类,它实现了Queue接口。我们使用offer()方法将元素插入队列,使用poll()方法从队列中获取元素。

LogTask类的代码非常简单:

  1. public class LogTask implements Runnable {
  2. @Override
  3. public void run() {
  4. try {
  5. while (Thread.currentThread().interrupted()) {
  6. TimeUnit.SECONDS.sleep(10);
  7. Logger.writeLogs();
  8. }
  9. } catch (InterruptedException e) {
  10. }
  11. Logger.writeLogs();
  12. }
  13. }

该类实现了Runnable接口,而且在run()方法中,每10秒钟调用一次Logger类的writeLogs()方法。

Logger类有5个不同的静态方法。首先,用一个静态代码块初始化并启动执行LogTask的线程,并且该线程创建用于存放日志数据的ConcurrentLinkedQueue类:

  1. public class Logger {
  2. private static ConcurrentLinkedQueue<String>logQueue = new
  3. ConcurrentLinkedQueue<String>();
  4. private static Thread thread;
  5. private static final String LOG_FILE = Paths.get("output",
  6. "server.log").toString();
  7. static {
  8. LogTask task = new LogTask();
  9. thread = new Thread(task);
  10. }

然后,Logger类中含有一种sendMessage()方法,该方法接收一个字符串作为参数并且将该消息存放在队列之中。该方法使用offer()方法存放消息:

  1. public static void sendMessage(String message) {
  2. logQueue.offer(new Date()+": "+message);
  3. }

Logger类中的关键方法是writeLogs()类。该方法使用ConcurrentLinkedQueue类的poll()方法获取并删除队列中存储的所有日志消息,并将它们写入文件。

  1. public static void writeLogs() {
  2. String message;
  3. Path path = Paths.get(LOG_FILE);
  4. try (BufferedWriterfileWriter = Files.newBufferedWriter(path,
  5. StandardOpenOption.CREATE,
  6. StandardOpenOption.APPEND)) {
  7. while ((message = logQueue.poll()) != null) {
  8. fileWriter.write(new Date()+": "+message);
  9. fileWriter.newLine();
  10. }
  11. } catch (IOException e) {
  12. e.printStackTrace();
  13. }
  14. }

最后还有两个方法:一个用于删减日志文件,另一个用于结束日志系统的执行器,该方法会中断执行LogTask的线程。

  1. public static void initializeLog() {
  2. Path path = Paths.get(LOG_FILE);
  3. if (Files.exists(path)) {
  4. try (OutputStream out = Files.newOutputStream(path,
  5. StandardOpenOption.TRUNCATE_EXISTING)) {
  6. } catch (IOException e) {
  7. e.printStackTrace();
  8. }
  9. }
  10. thread.start();
  11. }
  12. public static void shutdown() {
  13. thread.interrupt();
  14. }

3.3.4 对比两种解决方案

现在,测试一下串行服务器和并发服务器,观察哪种解决方案会使服务器性能更好。我们实现了四个类进行自动测试,它们可以向服务器发出查询。这些类如下所示。

  • SerialClient:该类实现了一个可用的串行服务器客户端。该客户端产生了9个使用Query消息的请求和一个使用Report消息的查询。该客户端将重复该过程10次,这样就会请求90次Query查询和10次Report查询。
  • MultipleSerialClients:该类模拟了同时存在多个客户端的情况。对于这种情形,我们为每个SerialClient创建一个线程,并且同时运行这些客户端以查看服务器的性能。我们测试了1到5个并发客户端。
  • ConcurrentClient:该类与SerialClient类相似,只不过它调用的是并发服务器而非串行服务器。
  • MultipleConcurrentClients:该类与MultipleSerialClients类相似,只不过它调用的是并发服务器而非串行服务器。

要测试串行服务器,可以按照下述步骤进行。

(1) 启动串行服务器并且等待其初始化。

(2) 启动MultipleSerialClients类,该类首先启动一个SerialClient类,然后依次启动两个、三个、四个,最后启动五个SerialClient类。

对于并发服务器,你可以按照类似过程进行处理。

(1) 启动并发服务器并且等待其初始化。

(2) 启动MultipleConcurrentClients类,该类首先启动一个ConcurrentClient类,然后依次启动两个、三个、四个,最后启动五个ConcurrentClient类。

为比较这两个版本的执行时间,我们使用JMH框架(请查看名为“Code Tools: jmh”的文章)实现了一个微基准测试,该框架支持在Java中实现微型基准测试。使用面向基准测试的框架是一种比较好的解决方案,可直接使用其中的currentTimeMillis()方法或者nanoTime()方法测量时间。我们在两套计算机架构上分别将其执行10次。

  • 一台计算机配置了Intel Core i5-5300 CPU、Windows 7操作系统和16GB的RAM。该处理器有两个核,每个核可以执行两个线程,这样我们就有了四个并行线程。
  • 另一台计算机配置了AMD A8-640 CPU、Windows 10 操作系统和8GB的RAM。该处理器有四个核。

全部执行结果如下。

客户端AMDIntel
串行并发加速比串行并发加速比
14.9704.3911.131.0900.9141.19
29.7135.1541.881.9811.3121.51
314.5656.2442.332.9031.6441.77
419.7517.6762.573.8781.9881.95
524.2128.4342.874.7752.3462.04

上表各单元中给出的是每个客户端的平均时间(单位:秒)。我们可以得出以下结论。

  • 在两种架构上的执行时间差别很大,这需要考虑多方面的因素,例如硬盘、内存和操作系统等都会影响性能。不过,两种架构下得到加速比比较相近。
  • 两种服务器的性能均受向服务器发送请求的并发客户端数量的影响。
  • 在所有情况下,并发版本的执行时间均比串行版本的执行时间更低。

3.3.5 其他重要方法

贯穿本章,我们使用了Java并发API中的一些类实现执行器框架的基础功能。这些类还有其他一些重要方法。在本节,我们将讲解其中一部分。

Executors类提供了其他一些创建ThreadPoolExecutor对象的方法。这些方法有如下几种。

  • newCachedThreadPool():该方法创建了一个ThreadPoolExecutor对象,会重新使用空闲的工作线程,但是如果必要,它也会创建一个新的工作线程。在此并没有最大工作线程数。
  • newSingleThreadExecutor():该方法创建了一个仅使用单个工作线程的ThreadPoolExecutor对象。发送给执行器的任务会存储在一个队列中,直到该工作线程可以执行它们为止。
  • CountDownLatch类额外提供了如下几种方法。
    • await(long timeout, TimeUnit unit):该方法将一直等待,直到内部计数器数值为0并超过参数中指定的时间为止。如果超时,则该方法返回false值。
    • getCount():该方法返回内部计数器的实际值。

Java中有两种类型的并发数据结构。

  • 阻塞型数据结构:当你调用某个方法但是类库无法执行该项操作时(例如,你试图获取某个元素而数据结构是空的),这种结构将阻塞线程直到这些操作可以执行。
  • 非阻塞型数据结构:当你调用某个方法但是类库无法执行该项操作时(因为结构为空或者为满),该方法会返回一个特定值或抛出一个异常。

既有实现上述两种行为的数据结构,也有仅实现其中一种行为的数据结构。通常,阻塞型数据结构也会实现具有非阻塞型行为的方法,而非阻塞型数据结构并不会实现阻塞型方法。

实现阻塞型操作的方法如下。

  • put()putFirst()putLast():这些方法将一个元素插入数据结构。如果该数据结构已满,则会阻塞该线程,直到出现空间为止。
  • take()takeFirst()takeLast():这些方法返回并且删除数据结构中的一个元素。如果该数据结构为空,则会阻塞该线程直到其中有元素为止。

实现非阻塞型操作的方法如下。

  • add()addFirst()addLast():这些方法将一个元素插入数据结构。如果该数据结构已满,则会抛出一个IllegalStateException异常。
  • remove()removeFirst()removeLast():这些方法将返回并且删除数据结构中的一个元素。如果该结构为空,则这些方法将抛出一个IllegalStateException异常。
  • element()getFirst()getLast():这些方法将返回但是不删除数据结构中的一个元素。如果该数据结构为空,则会抛出一个IllegalStateException异常。
  • offer()offerFirst()offerLast():这些方法可以将一个元素插入数据结构。如果该结构已满,则返回一个Booleanfalse
  • poll()pollFirst()pollLast():这些方法将返回并且删除数据结构中的一个元素。如果该结构为空,则返回null值。
  • peek()peekFirst()peekLast():这些方法返回但是并不删除数据结构中的一个元素。如果该数据结构为空,则返回null值。

第11章将更加详细地讲述并发数据结构。