12.1 监视并发对象
Java并发API提供的大多数并发对象都含有可获知该对象状态的方法。这些状态包括当前正在执行的线程数、被阻断且等待某一条件的线程数、执行的任务数等。本节,你将学习要用到的最重要的方法,以及可通过这些方法获取到的信息。这些信息对于检测导致错误的原因很有用,尤其是在错误仅在某些罕见条件下发生之时。
12.1.1 监视线程
线程是Java并发API中最基本的元素。它允许你执行一个原始任务。当线程开始执行时,你可以决定执行什么样的代码(扩展Thread类或者实现Runnable接口),以及如何使其与应用程序的其他任务同步。Thread类提供了一些可以获取线程信息的方法。其中最有用的一些方法如下。
getId():该方法返回线程的标识符。标识符是一个long型的正数,而且是唯一的。getName():该方法返回线程的名称。默认情况下,其命名格式为Thread-xxx,不过线程名称可以在构造函数中修改,也可以使用setName()方法修改。getPriority():该方法返回线程的优先级。默认情况下,所有线程的优先级都为5,但可以使用setPriority()方法来更改。优先较高的线程比优先级较低的线程更容易被优先选用。getState():该方法返回线程的状态。它返回Enum Thread.State中的一个值,且其取值可以为NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING和TERMINATED。可查看API文档来了解每个状态的真实含义。getStackTrace():该方法将线程的调用栈作为一个StackTraceElement对象数组返回。可以打印该数组,以了解该线程被做了哪些调用。
例如,可以使用如下这样一段代码来获取与某个线程相关的信息。
System.out.println("**********************");System.out.println("Id: " + thread.getId());System.out.println("Name: " + thread.getName());System.out.println("Priority: " + thread.getPriority());System.out.println("Status: " + thread.getState());System.out.println("Stack Trace");for(StackTraceElement ste : thread.getStackTrace()) {System.out.println(ste);}System.out.println("**********************\n");
通过这段代码,可以得到如下输出。

12.1.2 监视锁
锁是Java并发API提供的基本同步元素之一。它在Lock接口和ReentrantLock类中定义。基本上,锁允许你在代码中定义一个临界段,不过,锁机制要比synchronized关键字等其他机制更加灵活(例如,你可以针对读写操作定义不同的锁,或者定义非线性的临界段)。ReentrantLock类还有一些方法可以帮助你获知Lock对象的状态。
getOwner():该方法返回一个Thread对象,其中含有当前加锁的线程,也就是说,该线程正在执行临界段。hasQueuedThreads():该方法返回一个布尔值,它表示是否有线程等待获取锁。getQueueLength():该方法返回一个int值,它表示当前等待获取锁的线程数。getQueuedThreads():该方法返回一个Collection对象,其中含有当前等待获取锁的Thread对象。isFair():该方法返回一个布尔值,表示公平属性的状态。该属性的值用于判定下一个获取锁的线程。可查看Java API相关信息来详细了解这一功能。isLocked():该方法返回一个布尔值,表示锁是否归某个线程所有。getHoldCount():该方法返回一个int值,该值表示当前线程获取到锁的次数。如果当前线程并没有得到锁,则返回值为0。否则,对于当前没有调用相匹配的unlock()方法的线程,该方法将返回lock()方法在该线程中被调用的次数。
getOwner()方法和getQueuedThreads()方法是受保护的,因此不能直接访问。要解决这一问题,你可以实现自己的Lock类,并且实现能够提供这些信息的方法。
例如,你可以定义一个名为MyLock的类,如下所示:
public class MyLock extends ReentrantLock {private static final long serialVersionUID = 8025713657321635686L;public String getOwnerName() {if (this.getOwner() == null) {return "None";}return this.getOwner().getName();}public Collection<Thread> getThreads() {return this.getQueuedThreads();}}
这样,可以使用一段类似下面的代码来获取与某个锁相关的全部信息。
System.out.println("************************\n");System.out.println("Owner : " + lock.getOwnerName());System.out.println("Queued Threads: " + lock.hasQueuedThreads());if (lock.hasQueuedThreads()) {System.out.println("Queue Length: " + lock.getQueueLength());System.out.println("Queued Threads: ");Collection<Thread> lockedThreads = lock.getThreads();for (Thread lockedThread : lockedThreads) {System.out.println(lockedThread.getName());}}System.out.println("Fairness: " + lock.isFair());System.out.println("Locked: " + lock.isLocked());System.out.println("Holds: "+lock.getHoldCount());System.out.println("************************\n");
通过该代码块,你将得到类似如下所示的输出结果。

12.1.3 监视执行器
执行器框架是这样一种机制:它允许你执行并发任务而无须考虑线程的创建和管理问题。你可以将任务发送给执行器。它有一个内部线程池,执行任务时可以再利用。执行器也提供了一种机制来控制任务所消耗的资源,这样你就无须担心系统过载。执行器框架提供了Executor接口和ExecutorService接口,以及一些实现这些接口的类。这其中最基本的类是ThreadPoolExecutor,它提供了一些方法,可以帮助你获知执行器的状态。
getActiveCount():该方法返回执行器中正在执行任务的线程数。getCompletedTaskCount():该方法返回执行器已经执行且已完成执行的任务数。getCorePoolSize():该方法返回核心线程数目。这一数目决定了线程池中的最小线程数。即使执行器中没有任务运行,线程池中的线程数也不会少于该方法所返回的数目。getLargestPoolSize():该方法返回执行器线程池已经同时执行过的最大线程数。getMaximumPoolSize():该方法返回执行器线程池中同时可以存在的最大线程数。getPoolSize():该方法返回线程池中当前的线程数。getTaskCount():该方法返回已经发送给执行器的任务数,包括正在等待、运行中和已经完成的任务。isTerminated():如果调用了shutdown()或shutdownNow()方法并且执行器已完成了所有未完成任务的执行,则该方法返回true,否则返回false。isTerminating():如果调用了shutdown()或shutdownNow()方法,但是执行器仍然在执行任务,则该方法返回true。
可以使用类似如下的代码片段获取有关ThreadPoolExecutor的信息。
System.out.println ("*******************************************");System.out.println("Active Count: "+executor.getActiveCount());System.out.println("Completed Task Count: "+executor.getCompletedTaskCount());System.out.println("Core Pool Size:"+ executor.getCorePoolSize());System.out.println("Largest Pool Size: "+ executor.getLargestPoolSize());System.out.println("Maximum Pool Size: "+ executor.getMaximumPoolSize());System.out.println("Pool Size: "+executor.getPoolSize());System.out.println("Task Count: "+executor.getTaskCount());System.out.println("Terminated: "+executor.isTerminated());System.out.println("Is Terminating: "+executor.isTerminating());System.out.println ("*******************************************");
通过这段代码,可以得到类似如下的输出。

12.1.4 监视Fork/Join框架
Fork/Join框架提供了一种特殊的执行器,主要针对那些可以使用分治方法实现的算法。它基于工作窃取算法。创建一个用于处理整个问题的初始任务,该任务再创建其他子任务,每个子任务都处理问题的一部分(相对较小),并且等待任务执行完毕。分割后的每个任务都将它要处理的子问题的规模和预定义规模相比较,如果子问题的规模小于预定义规模,则直接求解该问题;否则,它将问题再次分割给其子任务处理,并且等待这些子任务返回结果。工作窃取算法利用了那些执行任务的线程,它们等待子任务返回结果并执行其他任务。ForkJoinPool类提供了如下方法以获取其状态。
getParallelism():该方法返回线程池确立的并行处理的预期层级。getPoolSize():该方法返回线程池中的线程数。getActiveThreadCount():该方法返回线程池中当前执行任务的线程数。getRunningThreadCount():该方法返回并不等待其子任务完成的线程的数量。getQueuedSubmissionCount():该方法返回已经提交给线程池但是尚未开始执行的任务数。getQueuedTaskCount():该方法返回线程池工作窃取队列中的任务数。hasQueuedSubmissions():如果有任务提交给线程池且尚未开始执行,则该方法返回true,否则返回false。getStealCount():该方法返回Fork/Join池执行工作窃取算法的次数。isTerminated():如果Fork/Join池完成执行,则该方法返回true,否则返回false。
可以使用如下所示的代码片段获得ForkJoinPool类的相关信息。
System.out.println("**********************");System.out.println("Parallelism: "+ pool.getParallelism());System.out.println("Pool Size: "+ pool.getPoolSize());System.out.println("Active Thread Count: "+ pool.getActiveThreadCount());System.out.println("Running Thread Count: "+ pool.getRunningThreadCount());System.out.println("Queued Submission: "+ pool.getQueuedSubmissionCount());System.out.println("Queued Tasks: "+pool.getQueuedTaskCount());System.out.println("Queued Submissions: "+ pool.hasQueuedSubmissions());System.out.println("Steal Count: "+ pool.getStealCount());System.out.println("Terminated : "+ pool.isTerminated());System.out.println("**********************");
在这里pool是一个ForkJoinPool对象(例如ForkJoinPool.commonPool())。使用这段代码,将得到下面这样的输出结果。

12.1.5 监视Phaser
Phaser是一种同步机制,允许执行可划分为多个阶段的任务。该类也包含一些用于获取Phaser状态的方法。
getArrivedParties():该方法返回已经完成当前阶段的已注册参与方的数量。getUnarrivedParties():该方法返回尚未完成当前阶段的已注册参与方的数量。getPhase():该方法返回当前阶段的编号。第一个阶段的编号为0。getRegisteredParties():该方法返回Phaser中已注册参与方的数量。isTerminated():该方法返回一个布尔值,用于指示Phaser是否已经完成执行。
可以使用如下代码片断获取Phaser的相关信息。
System.out.println ("*******************************************");System.out.println("Arrived Parties: "+ phaser.getArrivedParties());System.out.println("Unarrived Parties: "+ phaser.getUnarrivedParties());System.out.println("Phase: "+phaser.getPhase());System.out.println("Registered Parties: "+ phaser.getRegisteredParties());System.out.println("Terminated: "+phaser.isTerminated());System.out.println ("*******************************************");
通过这段代码,可以得到如下输出结果。

12.1.6 监视流API
流机制是Java 8中引入的最重要的新特性之一。它允许以并发方式处理大规模数据集,对该数据进行转换,并且以一种简单的方式实现MapReduce编程模型。该类并不提供任何获知流的状态的方法(除了isParallel()方法,它将返回流是否为并行的),不过其中含有一个名为peek()的方法,可以置于多个方法的流水线处理之中,用以输出与在流中执行的操作或变换相关的日志信息。
例如,下面这段代码计算了前999个数的平方的平均值。
double result=IntStream.range(0,1000).parallel().peek(n -> System.out.println (Thread.currentThread().getName()+": Number "+n)).map(n -> n*n).peek(n -> System.out.println (Thread.currentThread().getName()+": Transformer "+n)).average().getAsDouble();
第一个peek()方法输出该流处理的数,而第二个peek()方法则输出这些数的平方。如果执行这段代码,那么因为你是以并发方式执行该流的,所以将得到如下的输出结果。

