3.3 第二个例子:客户端/服务器环境下的并发处理
客户端/服务器模型是一种软件架构,基于这种模型的应用程序被划分为两个部分:提供资源(数据、操作、打印机、存储等)的服务器端和使用服务器端所提供的资源的客户端。传统上,这种架构用于企业领域,但是随着互联网的蓬勃发展,至今它仍然是一个很现实的主题。你也可以将Web应用程序视为客户端/服务器应用程序,其服务器部分就是在Web服务器中执行的应用程序后台部分,而Web浏览器则执行应用程序客户端部分。SOA(service-oriented architecture,面向服务的架构)是客户端/服务器架构的另一个例子,其中公开的Web服务即为其服务器部分,而使用这些服务的各种客户端则是客户端部分。
在客户端/服务器环境中,我们通常都有一台服务器和很多使用该服务器所提供服务的客户端,因此设计这样的系统时,服务器的性能方面非常重要。
在本节,我们将实现一个简单的客户端/服务器应用程序。它将对某银行发布的发展指数进行数据搜索。
该服务器主要有以下特点。
- 客户端与服务器都使用套接字连接。
- 客户端将以字符串形式发送查询,而服务器将用另一个字符串返回结果。
- 服务器可以响应三种不同查询。
Query:这种查询的格式是q;codCountry;codIndicator;year,其中codCountry是国家代码,codIndicator是指数代码,而year是一个可选参数,表示你想要查询的年份。服务器的响应信息将以单个字符串的形式返回。Report:这种查询的格式是r;codIndicator,其中codIndicator是你要制表的指数代码。服务器将以单个字符串形式响应各年份所有国家该指数的平均值。Stop:这种查询的格式是z;接收到该命令时,服务器将停止执行。
- 在其他情况下,服务器将返回一个错误消息。
与前面的例子相同,下文将展示如何实现该客户端/服务器应用程序的串行版本。然后,将展示如何使用执行器实现其并发版本。最后,我们将比较这两种解决方案,以审视在这种情况下使用并发处理的优点。
3.3.1 客户端/服务器:串行版
服务器应用程序的串行版本主要有三个部件。
- DAO(data access object,数据访问对象)部件,负责访问数据并且获取查询结果。
- 命令部件,由各种查询的命令组成。
- 服务器部件,接收查询,调用对应命令,并且向客户端返回结果。
下面仔细了解一下上述部件。
- DAO部件
正如我们前面提到的,服务器将针对发展指数进行数据搜索。该数据以CSV文件存放。该应用程序中的DAO组件将整个文件加载到内存的一个List对象中。它为涉及的每个查询都实现一个方法,而这些方法通过搜索该列表查找数据。
在此我们不介绍这个类的代码,因为很容易实现,而且它也不是本书所要讲述的重点内容。
- 命令部件
命令部件是DAO部件和服务器部件之间的中介。我们实现了一个基本抽象类Command,它是所有命令的基类。
public abstract class Command {protected final String[] command;public Command (String [] command) {this.command=command;}public abstract String execute ();}
然后,我们为每一个查询实现了一条命令。查询在QueryCommand类中实现,其execute()方法如下所示:
public String execute() {WDIDAOdao=WDIDAO.getDAO();if (command.length==3) {return dao.query(command[1], command[2]);} else if (command.length==4) {try {return dao.query(command[1], command[2],Short.parseShort(command[3]));} catch (Exception e) {return "ERROR;Bad Command";}} else {return "ERROR;Bad Command";}}
Report查询在ReportCommand中实现,其execute()方法如下所示:
@Overridepublic String execute() {WDIDAOdao=WDIDAO.getDAO();return dao.report(command[1]);}
Stop查询在StopCommand类中实现,其execute()方法如下所示:
@Overridepublic String execute() {return "Server stopped";}
最后,出错的情况通过ErrorCommand类处理,其execute()方法如下所示:
@Overridepublic String execute() {return "Unknown command: "+command[0];}
- 服务器部件
最后,服务器部件在SerialServer类中实现。首先,它通过调用getDAO()方法初始化DAO。其主要目的是使用DAO加载所有数据。
public class SerialServer {public static void main(String[] args) throws IOException {WDIDAOdao = WDIDAO.getDAO();booleanstopServer = false;System.out.println("Initialization completed.");try (ServerSocketserverSocket = new ServerSocket(Constants.SERIAL_PORT)) {
此后,我们需要执行一个循环,直到该服务器接收到一个Stop查询为止。该循环执行以下四步。
- 接收来自客户端的查询。
- 解析并分割该查询的要素。
- 调用对应的命令。
- 向客户端返回结果。
这四个步骤的实现如下述代码片段所示:
do {try (Socket clientSocket = serverSocket.accept();PrintWriter out = new PrintWriter(clientSocket.getOutputStream(),true);BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {String line = in.readLine();Command command;String[] commandData = line.split(";");System.out.println("Command: " + commandData[0]);switch (commandData[0]) {case "q":System.out.println("Query");command = new QueryCommand(commandData);break;case "r":System.out.println("Report");command = new ReportCommand(commandData);break;case "z":System.out.println("Stop");command = new StopCommand(commandData);stopServer = true;break;default:System.out.println("Error");command = new ErrorCommand(commandData);}String response = command.execute();System.out.println(response);} catch (IOException e) {e.printStackTrace();}} while (!stopServer);
3.3.2 客户端/服务器:并行版本
在串行版本中,服务器部件存在一个非常严重的缺陷。当处理一个查询时并不能兼顾其他查询。如果响应每个查询请求或特定请求需要耗费大量时间,那么服务器的性能就会很低。
我们可以使用并发处理获得更好的性能。如果服务器收到请求后创建了一个线程,它可以将该查询的所有处理委托该线程,并开始处理新的请求。这种方法也存在一些问题。如果我们接收到了大量查询,则会导致系统因创建太多线程而不堪重负。但是如果我们使用线程数固定的执行器,就可以控制服务器所使用的资源,并获得比串行版本更好的性能。
为了使用执行器将串行版服务器部件转换为并发版,必须修改服务器端。DAO部件是相同的,虽然我们也已经更改了实现命令部件的类名,但是实现过程几乎相同。只有Stop查询发生了改变,因为现在它有了更多职能。下面我们了解一下并发版服务器部件的实现细节。
- 服务器部件
并发服务器部件在ConcurrentServer部件中实现。我们为其加入了两项串行服务器不具备的要素:在ParallelCache类中实现的缓存系统和在Logger类中实现的日志系统。首先,并发服务器调用getDAO()方法初始化DAO部分。主要目标是DAO加载所有数据并且使用Executors类的newFixedThreadPool()方法创建一个ThreadPoolExecutor对象。该方法接收的是我们要在服务器中使用的最大工作线程数。执行器绝不会超过该工作线程数。要获得工作线程数,我们要使用Runtime类的availableProcessors()方法获取系统的核数。
public class ConcurrentServer {private static ThreadPoolExecutor executor;private static ParallelCache cache;private static ServerSocketserverSocket;private static volatileboolean stopped=false;public static void main(String[] args) {serverSocket=null;WDIDAOdao=WDIDAO.getDAO();executor=(ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());cache=new ParallelCache();Logger.initializeLog();System.out.println("Initialization completed.");
布尔变量stopped被声明为volatile型,因为另一个线程可以更改它。当stopped变量被另一个线程设置为true时,volatile关键字确保了这一改变在主方法中可见。如果没有volatile关键字,由于CPU缓存或者编译优化方面的原因,这样的改变则不可见。然后,我们初始化ServerSocket以监听请求:
serverSocket = new ServerSocket(Constants.CONCURRENT_PORT);
我们不能使用一个try-with-resources语句管理服务器socket。当我们收到Stop命令后需要关闭服务器,但是服务器正在等待serverSocket对象的accept()方法。为了迫使服务器丢弃该方法,需要显式关闭服务器(我们将在shutdown()方法中执行这一操作),因此不能使用try-with-resources语句关闭socket。
此后,我们会执行一个循环,直到服务器接收到一个Stop查询为止。该循环执行如下三个步骤。
- 从客户端接收一个查询。
- 创建一个任务处理该查询。
- 将该任务发送给执行器。
下面的代码片段展示了这三个步骤:
do {try {Socket clientSocket = serverSocket.accept();RequestTask task = new RequestTask(clientSocket);executor.execute(task);} catch (IOException e) {e.printStackTrace();}} while (!stopped);
最后,一旦服务器完成了执行(离开了该循环),则必须使用awaitTermination()方法结束该执行器。该执行器完成execution()方法前,该方法将一直阻塞主线程。然后,关闭缓存系统,等待一条表明服务器执行完毕的消息,如下所示:
executor.awaitTermination(1, TimeUnit.DAYS);System.out.println("Shutting down cache");cache.shutdown();System.out.println("Cache ok");System.out.println("Main server thread ended");
我们已经额外加入了两种方法:一种是getExecutor()方法,它返回了用于执行并发任务的ThreadPoolExecutor对象;另一种是shutdown()方法,它用于按照顺序结束服务器的执行器,该方法调用了执行器的shutdown()方法并且关闭ServerSocket。
public static void shutdown() {stopped = true;System.out.println("Shutting down the server...");System.out.println("Shutting down executor");executor.shutdown();System.out.println("Executor ok");System.out.println("Closing socket");try {serverSocket.close();System.out.println("Socket ok");} catch (IOException e) {e.printStackTrace();}System.out.println("Shutting down logger");Logger.sendMessage("Shutting down the logger");Logger.shutdown();System.out.println("Logger ok");}
在并发服务器中有一个关键部件:处理每个客户端请求的RequestTask类。该类实现了Runnable接口,这样它就可以以并发方式在执行器中执行。其构造函数将接收用于与客户端通信的Socket参数。
public class RequestTask implements Runnable {private final Socket clientSocket;public RequestTask(Socket clientSocket) {this.clientSocket = clientSocket;}
响应每个请求时,run()方法所完成的操作与串行服务器所做的操作相同。
- 接收客户端查询。
- 解析并分割该查询的要素。
- 调用相应的命令。
- 向客户端返回结果。
其代码片段如下所示:
public void run() {try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {String line = in.readLine();Logger.sendMessage(line);ParallelCache cache = ConcurrentServer.getCache();String ret = cache.get(line);if (ret == null) {Command command;String[] commandData = line.split(";");System.out.println("Command: " + commandData[0]);switch (commandData[0]) {case "q":System.err.println("Query");command = new ConcurrentQueryCommand(commandData);break;case "r":System.err.println("Report");command = new ConcurrentReportCommand(commandData);break;case "s":System.err.println("Status");command = new ConcurrentStatusCommand(commandData);break;case "z":System.err.println("Stop");command = new ConcurrentStopCommand(commandData);break;default:System.err.println("Error");command = new ConcurrentErrorCommand(commandData);break;}ret = command.execute();if (command.isCacheable()) {cache.put(line, ret);}} else {Logger.sendMessage("Command "+line+" was found in the cache");}System.out.println(ret);} catch (Exception e) {e.printStackTrace();} finally {try {clientSocket.close();} catch (IOException e) {e.printStackTrace();}}}
- 命令部件
正如前面的代码片段所示,我们重命名了命令部件中的所有类。除了ConcurrentStopCommand类之外,其他实现过程都相同。现在,命令部件调用ConcurrentServer类的shutdown()方法按照顺序结束服务器的执行。execute()方法的源代码如下:
@Overridepublic String execute() {ConcurrentServer.shutdown();return "Server stopped";}
同样,现在Command类包含了一个新的Boolean型的isCacheable()方法,如果缓存中存放了命令的结果,则该方法返回true,否则返回false。
3.3.3 额外的并发服务器组件
我们已经实现了一些额外的并发服务器组件:返回服务器状态信息的新命令,存储命令执行结果以便在重复请求时节省时间的缓存系统,以及记录错误信息和调试信息的日志系统。接下来将介绍这些组件。
- 状态命令
首先,我们有了一种新的查询。它有自己的格式,并且通过ConcurrentStatusCommand类处理。该类可获取服务器所使用的ThreadPoolExecutor对象,并且获取该执行器的相关状态信息:
public class ConcurrentStatusCommand extends Command {public ConcurrentStatusCommand (String[] command) {super(command);setCacheable(false);}@Overridepublic String execute() {StringBuildersb=new StringBuilder();ThreadPoolExecutor executor=ConcurrentServer.getExecutor();Logger.sendMessage(sb.toString());return sb.toString();}}
可以从该服务器获取的信息如下。
getActiveCount():该方法返回执行并发任务的大致任务数。线程池中可能有更多线程,但是它们都是空闲的。getMaximumPoolSize():该方法返回了执行器可拥有的工作线程的最大数目。getCorePoolSize():该方法返回了执行器拥有的核心工作线程数目。这个数字决定了线程池中线程数的最小值。getPoolSize():该方法返回了当前线程池中的线程数。getLargestPoolSize():该方法返回了线程池在执行期间的最大线程数。getCompletedTaskCount():该方法返回了执行器已经执行的任务数。getTaskCount():该方法返回了已预定执行任务的大致数目。getQueue().size():该方法返回了在任务队列中等待的任务数。
因为使用Executor类的newFixedThreadPool()方法创建了执行器,那么它的最大工作线程数和核心工作线程数相同。
- 缓存系统
并行服务器中带有一个缓存系统,其作用是避免重复搜索那些近期已经进行过的数据。该缓存系统有如下三个要素。
CacheItem类:该类用于描述在缓存中存放的每个元素,而且它有如下四个属性。- 在缓存中存储的命令。我们将
Query和Report命令存放在缓存之中。 - 该命令所产生的响应。
- 缓存中某一项的创建日期。
- 该项在缓存中的最后访问时间。
- 在缓存中存储的命令。我们将
CleanCacheTask类:如果在缓存中存储所有命令并从未删除,那么缓存的大小就会无限制增加。为了避免这种情况,我们还可以创建一个任务删除缓存中的元素,并将该任务作为一个Thread对象实现。有如下两种供选方案。- 你可以为缓存设定最大规模。如果缓存中的元素数大于最大值,就可以将那些近期很少访问的元素删除。
- 你可以删除缓存中那些在某个预定时段内未被访问的元素。我们将要采用的就是这种方式。
ParallelCache类:该类实现了在缓存中存储和检索各元素的操作。为了在缓存中存储数据,我们采用了一种ConcurrentHashMap数据结构。因为缓存由服务器所有任务共享,我们必须采用一种同步机制保护对缓存的访问,以避免数据竞争条件。有如下三种供选方案。- 我们可以使用一种
non-synchronized型的数据结构(例如HashMap)并且加入必要代码同步对该数据结构的各种访问,例如,采用锁。你也可以使用Collections类的synchronizedMap()方法将一个HashMap转换为一个synchronized型结构。 - 使用
synchronized型的数据结构,例如Hashtable。对于这种情况,我们不会形成数据竞争条件,但是性能会更好。 - 使用并发数据结构,例如
ConcurrentHashMap类,该类消除了出现数据竞争条件的可能性,而且该类被优化用于高并发环境中。我们将使用ConcurrentHashMap类的对象实现这种方案。CleanCacheTask类的代码如下:
- 我们可以使用一种
public class CleanCacheTask implements Runnable {private final ParallelCache cache;public CleanCacheTask(ParallelCache cache) {this.cache = cache;}@Overridepublic void run() {try {while (!Thread.currentThread().interrupted()) {TimeUnit.SECONDS.sleep(10);cache.cleanCache();}} catch (InterruptedException e) {}}}
该类中有一个ParallelCache对象,并且每10秒钟,就会执行ParallelCache实例的cleanCache()方法。ParallelCache类有五种不同的方法。首先,该类的构造函数初始化了该缓存的元素。它创建了ConcurrentHashMap对象并且启动了一个执行CleanCacheTask类的线程:
public class ParallelCache {private final ConcurrentHashMap<String, CacheItem> cache;private final CleanCacheTask task;private final Thread thread;public static intMAX_LIVING_TIME_MILLIS = 600_000;public ParallelCache() {cache=new ConcurrentHashMap<>();task=new CleanCacheTask(this);thread=new Thread(task);thread.start();}
然后,该类中还有两个方法可用于存储和检索缓存中的元素。我们使用put()方法将元素插入HashMap,并使用get()方法在HashMap中检索元素:
public void put(String command, String response) {CacheItem item = new CacheItem(command, response);cache.put(command, item);}public String get (String command) {CacheItem item=cache.get(command);if (item==null) {return null;}item.setAccessDate(new Date());return item.getResponse();}
然后,CleanCacheTask类清理缓存的方法如下:
public void cleanCache() {Date revisionDate = new Date();Iterator<CacheItem> iterator = cache.values().iterator();while (iterator.hasNext()) {CacheItem item = iterator.next();if (revisionDate.getTime() - item.getAccessDate().getTime()>MAX_LIVING_TIME_MILLIS) {iterator.remove();}}}
最后,还有一个用于关闭缓存的方法,该方法可中断执行CleanCacheTask类的线程,并且返回缓存中存储的元素数。
public void shutdown() {thread.interrupt();}public intgetItemCount() {return cache.size();}
- 日志系统
在本章所有例子中,我们都使用System.out.println()方法将信息反馈到控制台。当你实现的是一个准备在生产环境中执行的企业应用程序时,最好的方法就是使用日志系统记录调试信息和错误信息。log4j是Java中最受欢迎的日志系统。在本例中,我们将实现自己的日志系统,该系统采用了生产者/消费者并发设计模式。使用日志系统的任务将作为生产者,而把日志信息写入文件的特别任务(作为一个线程执行)将作为消费者。该日志系统的组件如下。
LogTask:该类实现了日志消费者,它可每10秒钟读取队列中存储的日志消息并将其写入文件。该类通过一个Thread对象来执行。Logger:这是日志系统的主类。它有一个队列,生产者将存入信息,而消费者将读取这些信息。它还提供了一个可将消息加入队列的方法,以及一个获取队列存储的所有消息并将其写入磁盘的方法。
实现该队列,与缓存系统相同,我们需要采用一种并发数据结构,以避免任何数据不一致的错误。我们有如下两个供选方案。使用阻塞型数据结构。当队列为满(在我们的例子中,队列永不会满)或者为空时,将会阻塞线程。
- 使用非阻塞型数据结构。如果队列为满或者为空时,将会返回一个特定值。
我们选择了一种非阻塞型数据结构,即ConcurrentLinkedQueue类,它实现了Queue接口。我们使用offer()方法将元素插入队列,使用poll()方法从队列中获取元素。
LogTask类的代码非常简单:
public class LogTask implements Runnable {@Overridepublic void run() {try {while (Thread.currentThread().interrupted()) {TimeUnit.SECONDS.sleep(10);Logger.writeLogs();}} catch (InterruptedException e) {}Logger.writeLogs();}}
该类实现了Runnable接口,而且在run()方法中,每10秒钟调用一次Logger类的writeLogs()方法。
Logger类有5个不同的静态方法。首先,用一个静态代码块初始化并启动执行LogTask的线程,并且该线程创建用于存放日志数据的ConcurrentLinkedQueue类:
public class Logger {private static ConcurrentLinkedQueue<String>logQueue = newConcurrentLinkedQueue<String>();private static Thread thread;private static final String LOG_FILE = Paths.get("output","server.log").toString();static {LogTask task = new LogTask();thread = new Thread(task);}
然后,Logger类中含有一种sendMessage()方法,该方法接收一个字符串作为参数并且将该消息存放在队列之中。该方法使用offer()方法存放消息:
public static void sendMessage(String message) {logQueue.offer(new Date()+": "+message);}
Logger类中的关键方法是writeLogs()类。该方法使用ConcurrentLinkedQueue类的poll()方法获取并删除队列中存储的所有日志消息,并将它们写入文件。
public static void writeLogs() {String message;Path path = Paths.get(LOG_FILE);try (BufferedWriterfileWriter = Files.newBufferedWriter(path,StandardOpenOption.CREATE,StandardOpenOption.APPEND)) {while ((message = logQueue.poll()) != null) {fileWriter.write(new Date()+": "+message);fileWriter.newLine();}} catch (IOException e) {e.printStackTrace();}}
最后还有两个方法:一个用于删减日志文件,另一个用于结束日志系统的执行器,该方法会中断执行LogTask的线程。
public static void initializeLog() {Path path = Paths.get(LOG_FILE);if (Files.exists(path)) {try (OutputStream out = Files.newOutputStream(path,StandardOpenOption.TRUNCATE_EXISTING)) {} catch (IOException e) {e.printStackTrace();}}thread.start();}public static void shutdown() {thread.interrupt();}
3.3.4 对比两种解决方案
现在,测试一下串行服务器和并发服务器,观察哪种解决方案会使服务器性能更好。我们实现了四个类进行自动测试,它们可以向服务器发出查询。这些类如下所示。
SerialClient:该类实现了一个可用的串行服务器客户端。该客户端产生了9个使用Query消息的请求和一个使用Report消息的查询。该客户端将重复该过程10次,这样就会请求90次Query查询和10次Report查询。MultipleSerialClients:该类模拟了同时存在多个客户端的情况。对于这种情形,我们为每个SerialClient创建一个线程,并且同时运行这些客户端以查看服务器的性能。我们测试了1到5个并发客户端。ConcurrentClient:该类与SerialClient类相似,只不过它调用的是并发服务器而非串行服务器。MultipleConcurrentClients:该类与MultipleSerialClients类相似,只不过它调用的是并发服务器而非串行服务器。
要测试串行服务器,可以按照下述步骤进行。
(1) 启动串行服务器并且等待其初始化。
(2) 启动MultipleSerialClients类,该类首先启动一个SerialClient类,然后依次启动两个、三个、四个,最后启动五个SerialClient类。
对于并发服务器,你可以按照类似过程进行处理。
(1) 启动并发服务器并且等待其初始化。
(2) 启动MultipleConcurrentClients类,该类首先启动一个ConcurrentClient类,然后依次启动两个、三个、四个,最后启动五个ConcurrentClient类。
为比较这两个版本的执行时间,我们使用JMH框架(请查看名为“Code Tools: jmh”的文章)实现了一个微基准测试,该框架支持在Java中实现微型基准测试。使用面向基准测试的框架是一种比较好的解决方案,可直接使用其中的currentTimeMillis()方法或者nanoTime()方法测量时间。我们在两套计算机架构上分别将其执行10次。
- 一台计算机配置了Intel Core i5-5300 CPU、Windows 7操作系统和16GB的RAM。该处理器有两个核,每个核可以执行两个线程,这样我们就有了四个并行线程。
- 另一台计算机配置了AMD A8-640 CPU、Windows 10 操作系统和8GB的RAM。该处理器有四个核。
全部执行结果如下。
| 客户端 | AMD | Intel | ||||
|---|---|---|---|---|---|---|
| 串行 | 并发 | 加速比 | 串行 | 并发 | 加速比 | |
| 1 | 4.970 | 4.391 | 1.13 | 1.090 | 0.914 | 1.19 |
| 2 | 9.713 | 5.154 | 1.88 | 1.981 | 1.312 | 1.51 |
| 3 | 14.565 | 6.244 | 2.33 | 2.903 | 1.644 | 1.77 |
| 4 | 19.751 | 7.676 | 2.57 | 3.878 | 1.988 | 1.95 |
| 5 | 24.212 | 8.434 | 2.87 | 4.775 | 2.346 | 2.04 |
上表各单元中给出的是每个客户端的平均时间(单位:秒)。我们可以得出以下结论。
- 在两种架构上的执行时间差别很大,这需要考虑多方面的因素,例如硬盘、内存和操作系统等都会影响性能。不过,两种架构下得到加速比比较相近。
- 两种服务器的性能均受向服务器发送请求的并发客户端数量的影响。
- 在所有情况下,并发版本的执行时间均比串行版本的执行时间更低。
3.3.5 其他重要方法
贯穿本章,我们使用了Java并发API中的一些类实现执行器框架的基础功能。这些类还有其他一些重要方法。在本节,我们将讲解其中一部分。
Executors类提供了其他一些创建ThreadPoolExecutor对象的方法。这些方法有如下几种。
newCachedThreadPool():该方法创建了一个ThreadPoolExecutor对象,会重新使用空闲的工作线程,但是如果必要,它也会创建一个新的工作线程。在此并没有最大工作线程数。newSingleThreadExecutor():该方法创建了一个仅使用单个工作线程的ThreadPoolExecutor对象。发送给执行器的任务会存储在一个队列中,直到该工作线程可以执行它们为止。CountDownLatch类额外提供了如下几种方法。await(long timeout, TimeUnit unit):该方法将一直等待,直到内部计数器数值为0并超过参数中指定的时间为止。如果超时,则该方法返回false值。getCount():该方法返回内部计数器的实际值。
Java中有两种类型的并发数据结构。
- 阻塞型数据结构:当你调用某个方法但是类库无法执行该项操作时(例如,你试图获取某个元素而数据结构是空的),这种结构将阻塞线程直到这些操作可以执行。
- 非阻塞型数据结构:当你调用某个方法但是类库无法执行该项操作时(因为结构为空或者为满),该方法会返回一个特定值或抛出一个异常。
既有实现上述两种行为的数据结构,也有仅实现其中一种行为的数据结构。通常,阻塞型数据结构也会实现具有非阻塞型行为的方法,而非阻塞型数据结构并不会实现阻塞型方法。
实现阻塞型操作的方法如下。
put()、putFirst()、putLast():这些方法将一个元素插入数据结构。如果该数据结构已满,则会阻塞该线程,直到出现空间为止。take()、takeFirst()、takeLast():这些方法返回并且删除数据结构中的一个元素。如果该数据结构为空,则会阻塞该线程直到其中有元素为止。
实现非阻塞型操作的方法如下。
add()、addFirst()、addLast():这些方法将一个元素插入数据结构。如果该数据结构已满,则会抛出一个IllegalStateException异常。remove()、removeFirst()、removeLast():这些方法将返回并且删除数据结构中的一个元素。如果该结构为空,则这些方法将抛出一个IllegalStateException异常。element()、getFirst()、getLast():这些方法将返回但是不删除数据结构中的一个元素。如果该数据结构为空,则会抛出一个IllegalStateException异常。offer()、offerFirst()、offerLast():这些方法可以将一个元素插入数据结构。如果该结构已满,则返回一个Boolean值false。poll()、pollFirst()、pollLast():这些方法将返回并且删除数据结构中的一个元素。如果该结构为空,则返回null值。peek()、peekFirst()、peekLast():这些方法返回但是并不删除数据结构中的一个元素。如果该数据结构为空,则返回null值。
第11章将更加详细地讲述并发数据结构。
