11.2 同步机制
任务的同步机制是任务之间为得到预期结果而进行的协调。在并发应用程序中,有两种同步机制。
- 进程同步:想要控制任务的执行顺序时,就可以使用这种同步。例如,一个任务必须等待另一任务终止才开始执行。
- 数据同步:当两个或多个任务访问同一内存对象时,可以使用这种同步。在这种情况下,必须保护写入操作对该对象的访问权限。如果不这样做,就会出现数据竞争条件,一个程序的最终结果在每次执行时都不同。
Java并发API提供了多种机制,让你可以实现上述两种类型的同步。Java语言提供的最基本的同步机制是synchronized关键字。该关键字可应用于某个方法或者某个代码块。对于第一种情况,一次只有一个线程可以执行该方法。对于第二种情况,要指定一个对某个对象的引用。在这种情况下,同时只能执行被某一对象保护的一个代码块。
Java也提供了其他一些同步机制。
Lock接口及其实现类:该机制允许你实现一个临界段,保证只有一个线程执行该代码块。Semaphore类实现了由Edsger Dijkstra提出的著名的信号量同步机制。CountDownLatch允许你实现这样的场景:一个或多个线程等待其他线程结束。CyclicBarrier允许你将不同的任务同步到某个共同的节点。Phaser类允许你分为多个阶段实现并发任务。第6章中已经详细介绍了这种机制。Exchanger允许你在两个线程之间实现一个数据交换点。CompletableFuture是Java 8的新特性,它扩展了执行器任务的Future机制,以一种异步方式生成任务的结果。可以指定任务在结果生成之后执行,这样就可以控制任务的执行顺序。
下面将介绍如何使用这些机制,着重讲述Java 8中引入的CompletableFuture机制。
11.2.1 CommonTask类
我们实现了一个名为CommonTask的类。该类将在随机的一段时间(0到10秒)内将调用线程休眠。其源代码如下。
public class CommonTask {public static void doTask() {long duration = ThreadLocalRandom.current().nextLong(10);System.out.printf("%s-%s: Working %d seconds\n",new Date(),Thread.currentThread().getName(),duration);try {TimeUnit.SECONDS.sleep(duration);} catch (InterruptedException e) {e.printStackTrace();}}}
以下各节中实现的所有任务都要用该类来模拟其执行时间。
11.2.2 Lock接口
最基本的一种同步机制就是Lock接口及其实现类。基本实现类是ReentrantLock类。可以方便地使用该类实现一个临界段。例如,下面的任务在代码的第一行使用lock()方法获得了一个锁,并且在代码的最后一行使用unlock()方法释放了该锁。你必须在finally部分调用unlock()方法以避免出现问题。否则,如果抛出异常,则该锁将不被释放,会出现死锁。同时只有一个任务可以执行这两条语句之间的代码。
public class LockTask implements Runnable {private static ReentrantLock lock = new ReentrantLock();private String name;public LockTask(String name) {this.name=name;}@Overridepublic void run() {try {lock.lock();System.out.println("Task: " + name + "; Date: " + new Date()+ ": Running the task");CommonTask.doTask();System.out.println("Task: " + name + "; Date: " + new Date()+ ": The execution has finished");} finally {lock.unlock();}}}
你可以对此进行验证,例如,使用下述代码在一个执行器中执行10个任务。
public class LockMain {public static void main(String[] args) {ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();for (int i=0; i<10; i++) {executor.execute(new LockTask("Task "+i));}executor.shutdown();try {executor.awaitTermination(1, TimeUnit.DAYS);} catch (InterruptedException e) {e.printStackTrace();}}}
在下图中可以看到执行该例的结果。你会发现如何一次只执行一个任务。

11.2.3 Semaphore类
信号量机制是Edsger Dijkstra于1962年提出的,用于控制对一个或多个共享资源的访问。该机制基于一个内部计数器以及两个名为wait()和signal()的方法。当一个线程调用了wait()方法时,如果内部计数器的值大于0,那么信号量对内部计数器做递减操作,并且该线程获得对该共享资源的访问。如果内部计数器的值为0,那么线程将被阻塞,直到某个线程调用singal()方法为止。当一个线程调用了signal()方法时,信号量将会检查是否有某些线程处于等待状态(它们已经调用了wait()方法)。如果没有线程等待,它将对内部计数器做递增操作。如果有线程在等待信号量,就获取这其中的一个线程,该线程的wait()方法结束返回并且访问共享资源。其他线程将继续等待,直到轮到自己为止。
在Java中,信号量在Semaphore类中实现。wait()方法被称作acquire(),而signal()方法被称作release()。例如,在本例中便用到了一个采用Semaphore类保护其代码的任务。
public class SemaphoreTask implements Runnable{private Semaphore semaphore;public SemaphoreTask(Semaphore semaphore) {this.semaphore=semaphore;}@Overridepublic void run() {try {semaphore.acquire();CommonTask.doTask();} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}}}
在主程序中执行了10个任务,它们共享一个Semaphore类。该类使用两个共享资源初始化,这样就可以同时运行两个任务。
public static void main(String[] args) {Semaphore semaphore=new Semaphore(2);ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();for (int i=0; i<10; i++) {executor.execute(new SemaphoreTask(semaphore));}executor.shutdown();try {executor.awaitTermination(1, TimeUnit.DAYS);} catch (InterruptedException e) {e.printStackTrace();}}
下面的屏幕截图展示了该例的执行结果。可以看出有两个任务在同时运行。

11.2.4 CountDownLatch类
该类提供了一种等待一个或多个并发任务完成的机制。它有一个内部计数器,必须使用要等待的任务数初始化。然后,await()方法休眠调用线程,直到内部计数器为0,并且使用countDown()方法对该内部计数器做递减操作。
例如,在该任务中使用countDown()方法对CountDownLatch对象(作为构造函数的参数)的内部计数器做递减操作。
public class CountDownTask implements Runnable {private CountDownLatch countDownLatch;public CountDownTask(CountDownLatch countDownLatch) {this.countDownLatch=countDownLatch;}@Overridepublic void run() {CommonTask.doTask();countDownLatch.countDown();}}
然后,在main()方法中,在执行器中执行这些任务,并且使用CountDownLatch类的await()方法等待任务完成。countDownLatch对象采用要等待的任务数进行初始化。
public static void main(String[] args) {CountDownLatch countDownLatch=new CountDownLatch(10);ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();System.out.println("Main: Launching tasks");for (int i=0; i<10; i++) {executor.execute(new CountDownTask(countDownLatch));}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.executor.shutdown();}
下面的屏幕截图展现了本例的执行结果。

11.2.5 CyclicBarrier类
该类允许将一些任务同步到某个共同点。所有的任务都在该点等待,直到任务全部到达该点为止。从内部来看,该类还管理了一个内部计数器,用于记录尚未到达该点的任务。当一个任务到达指定点时,它要执行await()方法以等待其他任务。当所有任务都到达时,CyclicBarrier对象将它们唤醒,这样就能够继续执行。
当所有的参与方都到达后,该类允许执行另一个任务。为了实现这一点,要在该对象的构造函数中指定一个Runnable对象。
例如,我们实现了下面的Runnable接口,它采用一个CyclicBarrier对象来等待其他任务。
public class BarrierTask implements Runnable {private CyclicBarrier barrier;public BarrierTask(CyclicBarrier barrier) {this.barrier=barrier;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+": Phase 1");CommonTask.doTask();try {barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+": Phase 2");}}
我们还实现了另一个Runnable对象,当所有的任务都执行了await()方法之后,它将被CyclicBarrier执行。
public class FinishBarrierTask implements Runnable {@Overridepublic void run() {System.out.println("FinishBarrierTask: All the tasks have finished");}}
最后,在main()方法中,在一个执行器中执行了10个任务。可以发现,CyclicBarrier对象是用我们要同步的任务数和FinishBarrierTask对象来初始化的。
public static void main(String[] args) {CyclicBarrier barrier=new CyclicBarrier(10,new FinishBarrierTask());ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();for (int i=0; i<10; i++) {executor.execute(new BarrierTask(barrier));}executor.shutdown();try {executor.awaitTermination(1, TimeUnit.DAYS);} catch (InterruptedException e) {e.printStackTrace();}}
下面的屏幕截图展示了本例的执行结果。

可以看到,当所有的任务都到达调用await()方法的公共点时,将执行FinishBarrierTask,然后所有的任务都继续其执行过程。
11.2.6 CompletableFuture类
这是在Java 8并发API中引入的一种同步机制,在Java 9中又有了一些新方法。它扩展了Future机制,为其赋予了更强的功能和更大的灵活性。它允许实现一个事件驱动的模型,链接那些只有当其他任务执行完毕后才执行的任务。与Future接口相同,CompletableFuture也必须采用操作要返回的结果类型进行参数化。和Future对象一样,CompletableFuture类表示的是异步计算的结果,只不过CompletableFuture的结果可以由任意线程确立。当计算正常结束时,该类采用complete()方法确定结果,而当计算出现异常时,则采用completeExceptionally()方法。如果两个或者多个线程调用同一CompletableFuture的complete()方法或completeExceptionally()方法,那么只有第一个调用会起作用。
首先,可以使用构造函数创建CompletableFuture对象。在本例中,你需要使用前面介绍的complete()方法确定任务结果。不过,也可以使用runAsync()方法或者supplyAsync()创建一个任务结果。runAsync()方法执行一个Runnable对象并且返回CompletableFuture,这样计算就不能再返回任何结果了。supplyAsync()方法执行了Supplier接口的一个实现,它采用本次计算要返回的类型进行参数化。该Supplier接口提供了get()方法。在该方法中,需要包含任务代码并且返回任务生成的结果。在本例中,CompletableFuture的结果将作为Supplier接口的结果。
该类提供了大量方法,允许通过实现一个事件驱动的模型组织任务的执行顺序,一个任务只有在其之前的任务完成之后才会开始。这其中包括如下方法。
thenApplyAsync():该方法接收Function接口的一个实现(可以表示为一个lambda表达式)作为参数。该函数将在调用CompletableFuture完成后执行。该方法将返回CompletableFuture以获得Fuction的结果。thenComposeAsync():该方法和thenApplyAsync()方法相似,但是当供给函数也返回CompletableFuture时很有用。thenAcceptAsync():该方法和前一个方法相似,只不过其参数是Consumer接口的一个实现(也可以描述为一个lambda表达式);在这种情况下,计算不会返回结果。thenRunAsync():该方法和前一个等价,只不过在这种情况下接收一个Runnable对象作为参数。thenCombineAsync():该方法接收两个参数。第一个参数为另一个CompletableFuture实例,另一个参数是BiFunction接口的一个实现(可描述为一个lambda函数)。该BiFunction接口实现将在两个CompletableFuture(当前调用的和参数中的)都完成后执行。该方法将返回CompletableFuture以获取BiFunction的结果。runAfterBothAsync():该方法接收两个参数。第一个参数为另一个CompletableFuture,而第二个参数为Runnable接口的一个实现,它将在两个CompletableFuture(当前调用的和参数中的)都完成后执行。runAfterEitherAsync():该方法与前一个方法等价,只不过当其中一个CompletableFuture对象完成之后才会执行Runnable任务。allOf():该方法接收CompletableFuture对象的一个变量列表作为参数。它将返回一个CompletableFuture对象,而该对象将在所有的CompletableFuture对象都完成之后返回其结果。anyOf():该方法和前一个方法等价,只是返回的CompletableFuture对象会在其中一个CompletableFuture对象完成之后返回其结果。
最后,如果想要获取CompletableFuture返回的结果,可以使用get()方法或者join()方法。这两个方法都会阻塞调用线程,直到CompletableFuture完成之后返回其结果。这两个方法之间的主要区别在于,get()方法抛出ExecutionException(这是一个校验异常),而join()方法抛出RuntimeException(这是一个未校验异常)。因此,在不抛出异常的lambda(例如Supplier、Consumer或Runnable)内部,使用join()方法更为方便。
前面提到的大多数方法都有Async后缀。这意味着这些方法将使用ForkJoinPool.commonPool实例以并发方式执行。这些方法都有不带Async后缀的版本,它们将以串行方式执行(这就是说,与执行CompletableFuture的线程是同一个);还有带Async后缀并且以一个执行器实例作为额外参数的版本。这种情况下,CompletableFuture将在作为参数传递的执行器中以异步方式执行。
Java 9增加了一些方法,为CompletableFuture类赋予了更强的功能。
defaultExecutor():该方法用于返回并不接收Executor作为参数的那些异步操作的默认执行器。通常,它将是ForkJoinPool.commonPool()方法的返回值。copy():该方法创建CompletableFuture对象的一个副本。如果原来的CompletableFuture正常完成,则副本方法也将正常完成并返回相同的值。如果原来的CompletableFuture异常完成,则副本方法也异常完成,并且抛出CompletionException异常。completeAsync(): 该方法接收一个Supplier对象作为参数(还可以选择Executor)。借助Supplier的结果完成CompletableFuture。orTimeout():该方法接收一段时延(一段时间和一个TimeUnit)。如果CompletableFuture在这段时间之后没有完成,那么抛出TimeoutException异常并异常完成。completeOnTimeout():该方法与上一个方法相似,只不过它在作为参数的值的范围内正常完成。delayedExecutor():该方法返回一个Executor,该执行器在执行指定时延之后执行某一任务。
使用CompletableFuture类
在本例中,你将学会如何使用CompletableFuture类以并发方式执行一些异步任务。我们将用由亚马逊的20 000个商品构成的集合实现下面的任务树。

首先要使用这些范例(商品)。然后执行四个并发任务。第一个任务是搜索商品。当搜索完成后,将结果写入一个文件。第二个任务是获得评价最高的商品。第三个任务是获得销量最佳的商品。当这两个任务完成之后,将使用另一个任务将它们的信息连接起来。最后,第四个任务是获取购买过商品的用户列表。main()方法将等待所有任务结束,然后输出结果。
下面看看实现过程的细节。
- 辅助任务
在本例中,将用到一些辅助任务。第一个任务是LoadTask,用于从磁盘加载商品信息并且返回一个Product对象列表。
public class LoadTask implements Supplier<List<Product>> {private Path path;public LoadTask (Path path) {this.path=path;}@Overridepublic List<Product> get() {List<Product> productList=null;try {productList = Files.walk(path, FileVisitOption.FOLLOW_LINKS).parallel().filter(f -> f.toString().endsWith(".txt")).map(ProductLoader::load).collect (Collectors.toList());} catch (IOException e) {e.printStackTrace();}return productList;}}
该任务实现了Supplier接口,将其作为CompletableFuture执行。从内部来看,它使用一个流来处理和解析所有包含商品列表的文件。
第二个任务是SearchTask,该任务将实现对Product对象列表的搜索,查找在名称中含有某一单词的对象。该任务是Function接口的一个实现。
public class SearchTask implements Function<List<Product>,List<Product>> {private String query;public SearchTask(String query) {this.query=query;}@Overridepublic List<Product> apply(List<Product> products) {System.out.println(new Date()+": CompletableTask: start");List<Product> ret = products.stream().filter(product -> product.getTitle().toLowerCase().contains(query)).collect(Collectors.toList());System.out.println(new Date()+": CompletableTask: end:"+ret.size());return ret;}}
它接收含有全部商品信息的List,返回一个含有满足标准的商品的List。从内部来看,它基于输入列表创建了流,对其进行筛选,并且将结果收集到另一个列表中。
最后,WriteTask将搜索任务中获得的商品写入一个File对象。在我们的例子中生成了一个HTML文件,不过也可以以想要的格式输出这一信息。该任务实现了Consumer接口,这样它的代码就必须采用如下形式。
public class WriteTask implements Consumer<List<Product>> {@Overridepublic void accept(List<Product> products) {// 实现部分省略}}
main()方法
我们在main()方法中对这些任务进行了组织。首先,使用CompletableFuture类的supplyAsync()方法执行LoadTask。在LoadTask开始之前将等待3秒,以展示delayExecutor()方法如何工作。
public class CompletableMain {public static void main(String[] args) {Path file = Paths.get("data","category");System.out.println(new Date() + ": Main: Loading productsafter three seconds....");LoadTask loadTask = new LoadTask(file);CompletableFuture<List<Product>>loadFuture = CompletableFuture.supplyAsync(loadTask,CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
然后,有了生成的CompletableFuture,就可以在加载任务完成之后使用thenApplyAsync()执行搜索任务。
System.out.println(new Date() + ": Main: Then apply forsearch");CompletableFuture<List<Product>> completableSearch = loadFuture.thenApplyAsync(new SearchTask("love"));
一旦搜索任务完成,就要将执行结果输出到一个文件。由于该任务并不返回结果,我们使用thenAcceptAsync()方法。
CompletableFuture<Void> completableWrite = completableSearch.thenAcceptAsync(new WriteTask());completableWrite.exceptionally(ex -> {System.out.println(new Date() + ": Main: Exception "+ ex.getMessage());return null;});
如果写入任务抛出异常,那么使用exceptionally()方法指定要做的事项。
然后,在completableFuture对象上使用thenApplyAsync()方法执行该任务,以便获取购买某一商品的用户列表。将该任务描述为一个lambda表达式。请注意,该任务并不会与搜索任务并行执行。
System.out.println(new Date() + ": Main: Then apply for users");CompletableFuture<List<String>> completableUsers = loadFuture.thenApplyAsync(resultList -> {System.out.println(new Date()+ ": Main: Completable users: start");List<String> users = resultList.stream().flatMap(p -> p.getReviews().stream()).map(review -> review.getUser()).distinct().collect(Collectors.toList());System.out.println(new Date() + ": Main: Completable users: end");return users;});
并行处理这些任务时,我们还使用thenApplyAsync()方法执行了该任务,以便查找评价最高的商品和销量最佳的商品。我们也使用一个lambda表达式定义了这些任务。
System.out.println(new Date() + ": Main: Then apply for bestrated product....");CompletableFuture<Product> completableProduct = loadFuture.thenApplyAsync(resultList -> {Product maxProduct = null;double maxScore = 0.0;System.out.println(new Date() + ": Main: Completable product:start");for (Product product : resultList) {if (!product.getReviews().isEmpty()) {double score = product.getReviews().stream().mapToDouble(review -> review.getValue()).average().getAsDouble();if (score > maxScore) {maxProduct = product;maxScore = score;}}}System.out.println(new Date() + ": Main: Completable product : end");return maxProduct;});System.out.println(new Date() + ": Main: Then apply for bestselling product....");CompletableFuture<Product> completableBestSellingProduct =loadFuture.thenApplyAsync(resultList -> {System.out.println(new Date() + ": Main: Completable bestselling: start");Product bestProduct = resultList.stream().min(Comparator.comparingLong(Product::getSalesrank)).orElse(null);System.out.println(new Date() + ": Main: Completable bestselling: end");return bestProduct;});
如前所述,我们想将前两个任务的结果连到一起。可以使用thenCombineAsync()方法完成这一工作,用它指定一个将在这两个任务完成之后执行的任务。
CompletableFuture<String> completableProductResult =completableBestSellingProduct.thenCombineAsync(completableProduct, (bestSellingProduct,bestRatedProduct) -> {System.out.println(new Date() + ": Main: Completable productresult: start");String ret = "The best selling product is "+ bestSellingProduct.getTitle() + "\n";ret += "The best rated product is "+ bestRatedProduct.getTitle();System.out.println(new Date() + ": Main: Completable productresult: end");return ret;});
最后,使用completeOnTimeout()方法预留1秒钟,以等待completableProductResult任务完成。如果它在1秒之内没有完成,那么完成CompletableFuture,并得出结果TimeOut。然后,使用allOf()方法和join()方法等待最终任务结束,并且输出使用get()方法获得的结果。
System.out.println(new Date() + ": Main: Waiting for results");completableProductResult.completeOnTimeout("TimeOut", 1,TimeUnit.SECONDS);CompletableFuture<Void> finalCompletableFuture = CompletableFuture.allOf(completableProductResult, completableUsers,completableWrite);finalCompletableFuture.join();try {System.out.println("Number of loaded products: "+ loadFuture.get().size());System.out.println("Number of found products: "+ completableSearch.get().size());System.out.println("Number of users: "+ completableUsers.get().size());System.out.println("Best rated product: "+ completableProduct.get().getTitle());System.out.println("Best selling product: "+ completableBestSellingProduct.get().getTitle());System.out.println("Product result:"+completableProductResult.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
在下面的屏幕截图中,可以看到本例的执行结果。

首先,main()方法执行所有配置并等待任务完成。这些任务按照配置的顺序执行。可以看到,LoadTask在三秒钟之后启动,以及completableProductResult返回字符串TimeOut,因为它在1秒钟之内还没有完成。
