4.2 第一个例子:高级服务器应用程序
第3章介绍了一个客户端/服务器应用程序的例子。本节实现了一个服务器,针对3.3节例子中的发展指数进行数据搜索,并且实现了一个客户端,多次调用该服务器,以便测试执行器的性能。
本节将扩展这个例子,为其加入几个特性,如下所示。
- 可以使用新的撤销型查询撤销服务器上执行的查询。
- 可以使用优先级参数控制查询执行的顺序。具有较高优先级的任务将优先执行。
- 服务器将计算任务的数量以及使用该服务器各用户的总执行时间。
为了实现这些新特征,对服务器做了以下改动。
- 为每个查询增加了两个参数。第一个参数是发送查询的用户名,而另一个则是查询的优先级。查询的新格式有如下几种。
Query查询:格式为q;username;priority;codCountry;codIndicator;year,其中,username是用户名,priority是查询优先级,codCountry是国家代码,codIndicator是指数代码,year是可选参数,表示想要查询的年份。Report查询:格式为r;username;priority;codIndicator,其中,username是用户名,priority是查询优先级,codIndicator是你想要制表的指数代码。Status查询:格式为s;username;priority,其中,username是用户名,priority是查询的优先级。Stop查询:格式为z;username;priority,其中,username是用户名,priority是查询的优先级。
- 还实现了一种新查询,如下所示。
Cancel查询:格式为c;username;priority,其中,username是用户名,priority是查询的优先级。
- 实现了自己的执行器进行如下操作。
- 统计每个用户对服务器的使用情况。
- 按照优先级执行任务。
- 控制任务的拒绝。
- 修改了
ConcurrentServer和RequestTask类以适应服务器的新要素。
服务器的其他要素(缓存系统、日志系统和DAO类等)都相同,因此不再赘述。
4.2.1 ServerExecutor类
如前所述,我们实现了自己的执行器执行服务器任务,也实现了一些额外的但是必要的类用以提供所有功能。现在介绍一下这些类。
- 统计对象
该服务器将计算每个用户在服务器上执行的任务数量,以及这些任务的总执行时间。为了存储这样的数据,我们实现了ExecutorStatistics类。它有两个用于存储这些信息的属性。
public class ExecutorStatistics {private AtomicLong executionTime = new AtomicLong(0L);private AtomicInteger numTasks = new AtomicInteger(0);
这些属性都是支持在单个变量上进行原子操作的AtomicVariables型变量,可以在不同的线程中使用这些变量,而且不需要采用任何同步机制。然后,该类还有两个方法可以分别增加任务数和执行时间。
public void addExecutionTime(long time) {executionTime.addAndGet(time);}public void addTask() {numTasks.incrementAndGet();}
最后,我们还增加了可获取上述两个属性值的方法,而且重载了toString()方法以可读方式获取信息。
@Overridepublic String toString() {return "Executed Tasks: "+ getNumTasks()+". Execution Time: "+getExecutionTime();}
- 被拒绝任务控制器
创建执行器时,可以指定一个类用以管理其拒绝的任务。如果在执行器已调用shutdown()或shutdownNow()方法之后提交任务,则该任务会被执行器拒绝。
为了控制这种情况,我们实现了RejectedTaskController类。该类实现了RejectedExecutionHandler接口和rejectedExecution()方法。
public class RejectedTaskController implementsRejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable task, ThreadPoolExecutorexecutor) {ConcurrentCommand command=(ConcurrentCommand)task;try (Socket clientSocket=command.getSocket();PrintWriter out = new PrintWriter(clientSocket.getOutputStream(),true);) {String message="The server is shutting down."+" Your request can not be served."+" Shutting Down: "+String.valueOf(executor.isShutdown()) + ". Terminated: "+String.valueOf(executor.isTerminated())+ ". Terminating: "+String.valueOf(executor.isTerminating());System.out.println(message);} catch (IOException e) {e.printStackTrace();}}
每个被拒绝的任务都要调用一次rejectedExecution()方法,而该方法将接收被拒绝的任务和拒绝该任务的执行器作为参数。
- 执行器任务
向执行器提交Runnable对象时,它并不会直接执行该对象,而是创建一个新的对象,即FutureTask类的一个实例,而且这项任务由执行器的工作线程执行。
在我们的例子中,为了度量任务的执行时间,我们在ServerTask类中实现了自己的FutureTask类。该类扩展了FutureTask类并且实现了Comparable接口,如下所示:
public class ServerTask<V> extends FutureTask<V> implementsComparable<ServerTask<V>>{
从内部看,该类存储了将作为ConcurrentCommand对象执行的查询。
private ConcurrentCommand command;
在构造函数中,该类使用FutureTask类的构造函数并且存储了ConcurrentCommand对象。
public ServerTask(ConcurrentCommand command) {super(command, null);this.command=command;}public ConcurrentCommand getCommand() {return command;}public void setCommand(ConcurrentCommand command) {this.command = command;}
最后,该类还实现了compareTo()操作,用于比较两个ServerTask实例存储的命令,如下所示:
@Overridepublic int compareTo(ServerTask<V> other) {return command.compareTo(other.getCommand());}
- 执行器
既然有了执行器的辅助类,那么必须实现执行器本身。我们实现了针对这一用途的ServerExecutor类,它扩展了ThreadPoolExecutor类并且还有一些内部属性,如下所示。
startTimes:这是用于存储每个任务开始日期的程序代码ConcurrentHashMap,其键为ServerTask对象(一个Runnable对象),而其值为Date对象。executionStatistics:这是用于存储每个用户使用情况统计的ConcurrentHashMap,其键为用户名,而其值为ExecutorStatistics对象。CORE_POOL_SIZE、MAXIMUM_POOL_SIZE和KEEP_ALIVE_TIME:这些是用于定义执行器特征的常量。REJECTED_TASK_CONTROLLER:这是一个RejectedTaskController类的属性,用于控制执行器拒绝的任务。
这可以通过以下代码解释。
public class ServerExecutor extends ThreadPoolExecutor {private ConcurrentHashMap<Runnable, Date> startTimes;private ConcurrentHashMap<String, ExecutorStatistics>executionStatistics;private static int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();private static int MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors();private static long KEEP_ALIVE_TIME = 10;private static RejectedTaskController REJECTED_TASK_CONTROLLER= new RejectedTaskController();public ServerExecutor() {super(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME,TimeUnit.SECONDS, new PriorityBlockingQueue<>(),REJECTED_TASK_CONTROLLER);startTimes = new ConcurrentHashMap<>();executionStatistics = new ConcurrentHashMap<>();}
该类的构造函数调用父类的构造函数,创建了一个PriorityBlockingQueue类,用于存储那些将在执行器中执行的任务。该类根据compareTo()方法的执行结果对元素进行排序(因此其中存储的元素必须实现Comparable接口)。该类的实现将允许我们按照优先级执行任务。
然后,重载了ThreadPoolExecutor类的一些方法。首先是beforeExecute()方法。该方法在每个任务执行之前执行。它接收ServerTask对象和执行该任务的线程作为参数。在本例中,将实际日期存放于ConcurrentHashMap,这样每个任务的起始日期如下所示。
protected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);startTimes.put(r, new Date());}
下一个方法是afterExecute()方法。该方法在执行器的每个任务执行完毕后执行,并接收已执行的ServerTask对象和Throwable对象,其中该方法将已执行的ServerTask对象作为参数。作为最后一个参数只有任务执行抛出异常时才会有值。在我们的例子中,将使用该方法进行如下操作。
(1) 计算任务的执行时间。
(2) 按照下述方式更新用户的统计信息。
@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);ServerTask<?> task=(ServerTask<?>)r;ConcurrentCommand command=task.getCommand();if (t==null) {if (!task.isCancelled()) {Date startDate = startTimes.remove(r);Date endDate=new Date();long executionTime= endDate.getTime() -startDate.getTime();ExecutorStatistics statistics = executionStatistics.computeIfAbsent (command.getUsername(),n -> new ExecutorStatistics());statistics.addExecutionTime(executionTime);statistics.addTask();ConcurrentServer.finishTask (command.getUsername(),command);}else {String message="The task" + command.hashCode() + "ofuser" + command.getUsername() + "hasbeen cancelled.";Logger.sendMessage(message);}} else {String message="The exception "+t.getMessage()+" hasbeen thrown.";Logger.sendMessage(message);}}
最后重载newTaskFor()方法。该方法执行后会转换发送给执行器的Runnable对象,使用执行器待执行的FutureTask实例中的submit()方法。在我们的例子中,使用ServerTask对象替代默认的FutureTask对象。
@Overrideprotected <T> RunnableFuture<T> newTaskFor(Runnable runnable, Tvalue) {return new ServerTask<T>(runnable);}
我们在执行器中引入了一种额外方法,将执行器中存储的所有统计信息写入日志系统。稍后你将看到,该方法将在服务器执行结束时被调用,代码如下所示:
public void writeStatistics() {for(Entry<String, ExecutorStatistics> entry: executionStatistics.entrySet()) {String user = entry.getKey();ExecutorStatistics stats = entry.getValue();Logger.sendMessage(user+":"+stats);}}
4.2.2 命令类
命令类执行发送给服务器的各种查询。可以向服务器发送以下5种查询。
Query查询:这种查询用于获取有关某个国家、某个指数以及某个年份(可选)的信息,通过ConcurrentQueryCommand类实现。Report查询:这种查询用于获取有关某个指数的信息,通过ConcurrentReportCommand类实现。Status查询:这种查询用于获取服务器状态的信息,通过ConcurrentStatusCommand类实现。Cancel查询:这种查询用于撤销某一用户任务的执行,通过ConcurrentCancelCommand类实现。Stop查询:这种查询用于停止服务器的执行,通过ConcurrentStopCommand类实现。
我们还有ConcurrentErrorCommand类和ConcurrentCommand类,前一种类用于管理某一未知命令到达服务器的情况,后一种类是所有命令的基类。
ConcurrentCommand类
这是每个命令的基类,包含所有命令的共性行为,如下所述。
- 调用实现每个命令特定逻辑的方法。
- 将结果写入客户端。
- 关闭在通信过程中使用的所有资源。
该类扩展了Command类并且实现了Comparable接口和Runnable接口。在第3章的例子中,命令都是较为简单的类,但在本例中,并发命令都是将要发送给执行器的Runnable对象。
public abstract class ConcurrentCommand extends Command implementsComparable<ConcurrentCommand>, Runnable{
该类有以下三个属性。
username:该属性用于存储发送该查询的用户名称。priority:该属性用于存储查询的优先级,它将决定查询的执行顺序。socket:该属性表示的是用于与客户端通信的套接字。
该类的构造函数中对这三个属性进行了初始化。
private String username;private byte priority;private Socket socket;public ConcurrentCommand(Socket socket, String[] command) {super(command);username=command[1];priority=Byte.parseByte(command[2]);this.socket=socket;}
该类的主要功能位于抽象方法execute()和run()方法中。其中,每个具体命令都通过实现execute()方法计算和返回查询的结果。run()方法调用execute()方法,将结果存储在缓存,写入套接字中,并且关闭在通信中使用的所有资源,代码如下所示:
@Overridepublic abstract String execute();@Overridepublic void run() {String message="Running a Task: Username: "+username+";Priority: "+priority;Logger.sendMessage(message);String ret=execute();ParallelCache cache = ConcurrentServer.getCache();if (isCacheable()) {cache.put(String.join(";",command), ret);}try (PrintWriter out = new PrintWriter(socket.getOutputStream(),true);) {System.out.println(ret);} catch (IOException e) {e.printStackTrace();}System.out.println(ret);}
最后,compareTo()方法使用其优先级属性确定任务执行的顺序。PriorityBlockingQueue类将使用该方法对任务进行排序,这样具有较高优先级的任务将优先执行。请注意,当getPriority()方法返回一个较低的值时,该任务具有较高的优先级。如果任务的getPriority()方法返回1,则该任务的优先级高于使用该方法返回2的任务的优先级。
@Overridepublic int compareTo(ConcurrentCommand o) {return Byte.compare(o.getPriority(), this.getPriority());}
- 具体的命令
我们已经在实现不同的命令类时做了稍许改动,而且还增加了一个由ConcurrentCancelCommand类实现的新类。这些类的主要逻辑都包含在execute()方法中,该方法计算查询的响应并且将其作为字符串返回。
ConcurrentCancelCommand新类的execute()方法调用了ConcurrentServer类的cancelTasks()方法。该方法将停止执行与参数中指定用户相关的所有待处理任务。
@Overridepublic String execute() {ConcurrentServer.cancelTasks(getUsername());String message = "Tasks of user "+getUsername()+" has been cancelled.";Logger.sendMessage(message);return message;}
ConcurrentReportCommand类的execute()方法使用WDIDAO类的query()方法获取用户请求的数据。在第3章中,你可以找到该方法的实现,这里实现过程基本一样。唯一的区别在于命令数组索引,如下所示:
@Overridepublic String execute() {WDIDAO dao=WDIDAO.getDAO();if (command.length==5) {return dao.query(command[3], command[4]);} else if (command.length==6) {try {return dao.query(command[3], command[4],Short.parseShort(command[5]));} catch (NumberFormatException e) {return "ERROR;Bad Command";}} else {return "ERROR;Bad Command";}}
ConcurrentQueryCommand类的execute()方法使用WDIDAO类的report()方法获取数据。在第3章中,可以找到该方法的实现。这里的实现过程几乎相同,唯一的区别在于命令数组索引。
@Overridepublic String execute() {WDIDAO dao=WDIDAO.getDAO();return dao.report(command[3]);}
ConcurrentStatusCommand类在其构造函数中有一个额外参数:Executor对象,该对象将执行这些命令。这些命令使用该对象获取有关执行器的信息,并且将其作为响应发送给用户。这一实现与第3章基本相同。我们使用同样的方法获取Executor对象的状态。
ConcurrentStopCommand类和ConcurrentErrorCommand类也与第3章相同,因此不再对其源代码进行说明。
4.2.3 服务器部件
服务器接收来自所有客户端的查询,创建执行这些查询的命令类并且将其发送给执行器。服务器部件由如下两个类实现。
ConcurrentServer类:该类包含服务器的main()方法以及额外用于撤销任务和结束系统执行的方法。RequestTask类:该类创建命令并且将其发送给执行器。
与第3章中的例子相比,最主要的区别在于RequestTask的角色。在SimpleServer例子中,ConcurrentServer类为每个查询创建一个RequestTask对象,并且将其发送给执行器。在本例中,只有一个将作为线程执行的RequestTask的实例。当ConcurrentServer收到一个连接,它将与客户端通信所需的套接字存放在待连接列表中。RequestTask线程读取该套接字,处理客户端发送的数据,创建相应的命令并且将其发送给执行器。
这样更改的主要原因是为了只在任务中留下执行器要执行的查询代码,而将预处理代码放在执行器之外。
ConcurrentServer类
ConcurrentServer类需要一些内部属性才能更好地工作。
- 一个
ParallelCache实例,用于调用缓存系统。 - 一个
ServerSocket实例,用于从客户端获取连接。 - 一个布尔值,用于指明该类何时要停止执行。
- 一个
LinkedBlockingQueue实例,用于存储那些向服务器发送消息的客户端的套接字。这些套接字将由RequestTask类处理。 - 一个
ConcurrentHashMap,用于存放执行器执行的每个任务所关联的Future对象。它的键是发送查询的用户名,而其值为另一个Map,该Map的键是一个ConcurrenCommand对象,它的值是与任务相关联的Future实例。使用这些Future实例撤销任务的执行。 - 一个
RequestTask实例,用于创建命令并且将它们发送给执行器。 - 一个
Thread对象,用于执行RequestTask对象。
这部分的代码如下。
public class ConcurrentServer {private static ParallelCache cache;private static volatile boolean stopped=false;private static LinkedBlockingQueue<Socket> pendingConnections;private static ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>>taskController;private static Thread requestThread;private static RequestTask task;
该类的main()方法初始化这些对象并且打开ServerSocket实例监听来自客户端的连接。此外,它创建了RequestTask对象并且将其作为线程执行。在此之后是一个循环,直到shutdown()方法改变了stopped属性的取值为止。此后,main()方法等待Excecutor对象结束,它要调用RequestTask对象的endTermination()方法,并且使用finishServer()方法关闭Logger系统和RequestTask对象。
public static void main(String[] args) {WDIDAO dao=WDIDAO.getDAO();cache=new ParallelCache();Logger.initializeLog();pendingConnections = new LinkedBlockingQueue<Socket>();taskController = new ConcurrentHashMap<String,ConcurrentHashMap<Integer, Future<?>>>();task=new RequestTask(pendingConnections, taskController);requestThread=new Thread(task);requestThread.start();System.out.println("Initialization completed.");serverSocket= new ServerSocket(Constants.CONCURRENT_PORT);do {try {Socket clientSocket = serverSocket.accept();pendingConnections.put(clientSocket);} catch (Exception e) {e.printStackTrace();}} while (!stopped);finishServer();System.out.println("Shutting down cache");cache.shutdown();System.out.println("Cache ok" + new Date());}
关闭服务器的执行器包括两种方法。shutdown()方法会改变stopped变量的取值并且关闭serverSocket实例。finishServer()方法用于停止执行器,中断执行RequestTask对象的线程,并且关闭Logger系统。我们将关闭服务器的过程划分为两部分,直到服务器的最后一条指令都可以使用Logger系统。
public static void shutdown() {stopped=true;try {serverSocket.close();} catch (IOException e) {e.printStackTrace();}}private static void finishServer() {System.out.println("Shutting down the server...");task.shutdown();System.out.println("Shutting down Request task");requestThread.interrupt();System.out.println("Request task ok");System.out.println("Closing socket");System.out.println("Shutting down logger");Logger.sendMessage("Shutting down the logger");Logger.shutdown();System.out.println("Logger ok");System.out.println("Main server thread ended");}
该服务器还包含了撤销用户关联任务的方法。正如前面提到的,Server类使用一个嵌套的ConcurrentHashMap存储所有与用户关联的任务。首先,获得含有用户所有任务的Map,然后调用Future对象的cancel()方法处理那些任务的所有Future对象。传递true值作为参数,这样,如果执行器正在执行一个来自该用户的任务,那么它将会中断。我们也给出了必要的代码以避免撤销ConcurrentCancelCommand。
public static void cancelTasks(String username) {ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks =taskController.get(username);if (userTasks == null) {return;}int taskNumber = 0;Iterator<ServerTask<?>> it = userTasks.values().iterator();while(it.hasNext()) {ServerTask<?> task = it.next();ConcurrentCommand command = task.getCommand();if(!(command instanceof ConcurrentCancelCommand) &&task.cancel(true)) {taskNumber++;Logger.sendMessage("Task with code "+command.hashCode()+"cancelled: "+ command.getClass().getSimpleName());it.remove();}}String message=taskNumber+" tasks has been cancelled.";Logger.sendMessage(message);}
最后我们还给出了finishTask()方法,当任务正常执行结束时,该方法从ServerTask对象的嵌套Map中清除与该任务相关的Future对象。如下所示:
public static void finishTask(String username, ConcurrentCommandcommand) {ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks =taskController.get(username);userTasks.remove(command);String message = "Task with code "+command.hashCode()+" has finished";Logger.sendMessage(message);}
RequestTask类
RequestTask类是ConcurrentServer类与Executor类之间的中介,ConcurrentServer类用于连接客户端,Executor类用于执行并发任务。RequestTask类打开与客户端连接的套接字,读取查询数据,创建适当的命令,并且将命令发送给执行器。
该类用到以下几个内部属性。
LinkedBlockingQueue:ConcurrentServer类在其中存储客户端套接字。ServerExecutor:将命令作为并发任务执行。ConcurrentHashMap:存储与任务相关的Future对象。
该类的构造函数初始化了上述所有对象。
public class RequestTask implements Runnable {private LinkedBlockingQueue<Socket> pendingConnections;private ServerExecutor executor = new ServerExecutor();private ConcurrentMap<String, ConcurrentMap<ConcurrentCommand,ServerTask<?>>> taskController;public RequestTask(LinkedBlockingQueue<Socket>pendingConnections, ConcurrentHashMap<String,ConcurrentHashMap<Integer, Future<?>>>taskController) {this.pendingConnections = pendingConnections;this.taskController = taskController;}
run()方法是该类的主要方法。它执行循环直到该线程中断处理存放在pendingConnections对象中的套接字。在该对象中,ConcurrentServer类存储了与不同客户端通信所用的套接字,并且这些客户端都向该服务器发送了一个查询。它打开套接字,读取数据并且创建相应的命令。它还将命令发送给执行器,并且在双重ConcurrentHashMap中存储Future对象,将该对象与任务的hashCode以及发送查询的用户相关联。
public void run() {try {while (!Thread.currentThread().interrupted()) {try {Socket clientSocket = pendingConnections.take();BufferedReader in = new BufferedReader(newInputStreamReader (clientSocket.getInputStream()));String line = in.readLine();Logger.sendMessage(line);ConcurrentCommand command;ParallelCache cache = ConcurrentServer.getCache();String ret = cache.get(line);if (ret == null) {String[] commandData = line.split(";");System.out.println("Command: " + commandData[0]);switch (commandData[0]) {case "q":System.out.println("Query");command = new ConcurrentQueryCommand(clientSocket,commandData);break;case "r":System.out.println("Report");command = new ConcurrentReportCommand (clientSocket,commandData);break;case "s":System.out.println("Status");command = new ConcurrentStatusCommand(executor,clientSocket,commandData);break;case "z":System.out.println("Stop");command = new ConcurrentStopCommand(clientSocket,commandData);break;case "c":System.out.println("Cancel");command = new ConcurrentCancelCommand (clientSocket,commandData);break;default:System.out.println("Error");command = new ConcurrentErrorCommand(clientSocket,commandData);break;}ServerTask<?> controller = (ServerTask<?>)executor.submit(command);storeContoller(command.getUsername(), controller, command);} else {PrintWriter out = new PrintWriter (clientSocket.getOutputStream(), true);System.out.println(ret);clientSocket.close();}} catch (IOException e) {e.printStackTrace();}}} catch (InterruptedException e) {// 不需要执行任何操作}}
storeController()方法在双重ConcurrentHashMap中存储Future对象。
private void storeContoller(String userName, ServerTask<?>controller, ConcurrentCommand command) {taskController.computeIfAbsent(userName, k -> newConcurrentHashMap<>()).put(command,controller);taskController.computeIfAbsent(userName, k -> newConcurrentHashMap<>()).put(command, controller);}
最后,给出了两个用于管理Executor类执行的方法。其中一个调用了面向执行器的shutdown()方法,而另一个方法则等待其结束。请记住,必须显式调用shutdown()方法或者shutdownNow()方法以终止执行器的执行。否则,程序不会结束。请看下面的代码:
public void shutdown() {String message="Request Task: "+pendingConnections.size()+"pending connections.";Logger.sendMessage(message);executor.shutdown();}public void terminate() {try {executor.awaitTermination(1,TimeUnit.DAYS);executor.writeStatistics();} catch (InterruptedException e) {e.printStackTrace();}}
4.2.4 客户端部件
现在,该是测试服务器的时候了。在本例中,并不用担心执行时间,测试的主要目标是检查新功能是否工作正常。
将客户端部件划分成下述两个类。
The ConcurrentClient类:该类实现了服务器单独的一个客户端。该类的每个实例都有不同的用户名称。该客户端创建了100个查询,其中90个为Query型,10个为Report型。Query型的查询其优先级为5,而Report型的查询优先级较低为10。MultipleConcurrentClient类:该类以并行方式度量了多个并发客户端的行为。我们使用了1到5个并发客户端测试服务器。该类还测试了cancel命令和stop命令。
我们采用一个执行器执行对服务器的并发请求,以便增加客户端的并发层级。
在下面的屏幕截图中,可以看到任务撤销的结果。

在本例中,用户USER_2的四个任务已被撤销。
下面的屏幕截图展示了每个用户的任务数量和执行时间的最终统计信息。

