11.2 同步机制

任务的同步机制是任务之间为得到预期结果而进行的协调。在并发应用程序中,有两种同步机制。

  • 进程同步:想要控制任务的执行顺序时,就可以使用这种同步。例如,一个任务必须等待另一任务终止才开始执行。
  • 数据同步:当两个或多个任务访问同一内存对象时,可以使用这种同步。在这种情况下,必须保护写入操作对该对象的访问权限。如果不这样做,就会出现数据竞争条件,一个程序的最终结果在每次执行时都不同。

Java并发API提供了多种机制,让你可以实现上述两种类型的同步。Java语言提供的最基本的同步机制是synchronized关键字。该关键字可应用于某个方法或者某个代码块。对于第一种情况,一次只有一个线程可以执行该方法。对于第二种情况,要指定一个对某个对象的引用。在这种情况下,同时只能执行被某一对象保护的一个代码块。

Java也提供了其他一些同步机制。

  • Lock接口及其实现类:该机制允许你实现一个临界段,保证只有一个线程执行该代码块。
  • Semaphore类实现了由Edsger Dijkstra提出的著名的信号量同步机制。
  • CountDownLatch允许你实现这样的场景:一个或多个线程等待其他线程结束。
  • CyclicBarrier允许你将不同的任务同步到某个共同的节点。
  • Phaser类允许你分为多个阶段实现并发任务。第6章中已经详细介绍了这种机制。
  • Exchanger允许你在两个线程之间实现一个数据交换点。
  • CompletableFuture是Java 8的新特性,它扩展了执行器任务的Future机制,以一种异步方式生成任务的结果。可以指定任务在结果生成之后执行,这样就可以控制任务的执行顺序。

下面将介绍如何使用这些机制,着重讲述Java 8中引入的CompletableFuture机制。

11.2.1 CommonTask

我们实现了一个名为CommonTask的类。该类将在随机的一段时间(0到10秒)内将调用线程休眠。其源代码如下。

  1. public class CommonTask {
  2. public static void doTask() {
  3. long duration = ThreadLocalRandom.current().nextLong(10);
  4. System.out.printf("%s-%s: Working %d seconds\n",
  5. new Date(),Thread.currentThread().getName(),
  6. duration);
  7. try {
  8. TimeUnit.SECONDS.sleep(duration);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }

以下各节中实现的所有任务都要用该类来模拟其执行时间。

11.2.2 Lock接口

最基本的一种同步机制就是Lock接口及其实现类。基本实现类是ReentrantLock类。可以方便地使用该类实现一个临界段。例如,下面的任务在代码的第一行使用lock()方法获得了一个锁,并且在代码的最后一行使用unlock()方法释放了该锁。你必须在finally部分调用unlock()方法以避免出现问题。否则,如果抛出异常,则该锁将不被释放,会出现死锁。同时只有一个任务可以执行这两条语句之间的代码。

  1. public class LockTask implements Runnable {
  2. private static ReentrantLock lock = new ReentrantLock();
  3. private String name;
  4. public LockTask(String name) {
  5. this.name=name;
  6. }
  7. @Override
  8. public void run() {
  9. try {
  10. lock.lock();
  11. System.out.println("Task: " + name + "; Date: " + new Date()
  12. + ": Running the task");
  13. CommonTask.doTask();
  14. System.out.println("Task: " + name + "; Date: " + new Date()
  15. + ": The execution has finished");
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. }

你可以对此进行验证,例如,使用下述代码在一个执行器中执行10个任务。

  1. public class LockMain {
  2. public static void main(String[] args) {
  3. ThreadPoolExecutor executor=(ThreadPoolExecutor)
  4. Executors.newCachedThreadPool();
  5. for (int i=0; i<10; i++) {
  6. executor.execute(new LockTask("Task "+i));
  7. }
  8. executor.shutdown();
  9. try {
  10. executor.awaitTermination(1, TimeUnit.DAYS);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }

在下图中可以看到执行该例的结果。你会发现如何一次只执行一个任务。

11.2 同步机制 - 图1

11.2.3 Semaphore

信号量机制是Edsger Dijkstra于1962年提出的,用于控制对一个或多个共享资源的访问。该机制基于一个内部计数器以及两个名为wait()signal()的方法。当一个线程调用了wait()方法时,如果内部计数器的值大于0,那么信号量对内部计数器做递减操作,并且该线程获得对该共享资源的访问。如果内部计数器的值为0,那么线程将被阻塞,直到某个线程调用singal()方法为止。当一个线程调用了signal()方法时,信号量将会检查是否有某些线程处于等待状态(它们已经调用了wait()方法)。如果没有线程等待,它将对内部计数器做递增操作。如果有线程在等待信号量,就获取这其中的一个线程,该线程的wait()方法结束返回并且访问共享资源。其他线程将继续等待,直到轮到自己为止。

在Java中,信号量在Semaphore类中实现。wait()方法被称作acquire(),而signal()方法被称作release()。例如,在本例中便用到了一个采用Semaphore类保护其代码的任务。

  1. public class SemaphoreTask implements Runnable{
  2. private Semaphore semaphore;
  3. public SemaphoreTask(Semaphore semaphore) {
  4. this.semaphore=semaphore;
  5. }
  6. @Override
  7. public void run() {
  8. try {
  9. semaphore.acquire();
  10. CommonTask.doTask();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. } finally {
  14. semaphore.release();
  15. }
  16. }
  17. }

在主程序中执行了10个任务,它们共享一个Semaphore类。该类使用两个共享资源初始化,这样就可以同时运行两个任务。

  1. public static void main(String[] args) {
  2. Semaphore semaphore=new Semaphore(2);
  3. ThreadPoolExecutor executor=(ThreadPoolExecutor)
  4. Executors.newCachedThreadPool();
  5. for (int i=0; i<10; i++) {
  6. executor.execute(new SemaphoreTask(semaphore));
  7. }
  8. executor.shutdown();
  9. try {
  10. executor.awaitTermination(1, TimeUnit.DAYS);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }

下面的屏幕截图展示了该例的执行结果。可以看出有两个任务在同时运行。

11.2 同步机制 - 图2

11.2.4 CountDownLatch

该类提供了一种等待一个或多个并发任务完成的机制。它有一个内部计数器,必须使用要等待的任务数初始化。然后,await()方法休眠调用线程,直到内部计数器为0,并且使用countDown()方法对该内部计数器做递减操作。

例如,在该任务中使用countDown()方法对CountDownLatch对象(作为构造函数的参数)的内部计数器做递减操作。

  1. public class CountDownTask implements Runnable {
  2. private CountDownLatch countDownLatch;
  3. public CountDownTask(CountDownLatch countDownLatch) {
  4. this.countDownLatch=countDownLatch;
  5. }
  6. @Override
  7. public void run() {
  8. CommonTask.doTask();
  9. countDownLatch.countDown();
  10. }
  11. }

然后,在main()方法中,在执行器中执行这些任务,并且使用CountDownLatch类的await()方法等待任务完成。countDownLatch对象采用要等待的任务数进行初始化。

  1. public static void main(String[] args) {
  2. CountDownLatch countDownLatch=new CountDownLatch(10);
  3. ThreadPoolExecutor executor=(ThreadPoolExecutor)
  4. Executors.newCachedThreadPool();
  5. System.out.println("Main: Launching tasks");
  6. for (int i=0; i<10; i++) {
  7. executor.execute(new CountDownTask(countDownLatch));
  8. }
  9. try {
  10. countDownLatch.await();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. System.out.
  15. executor.shutdown();
  16. }

下面的屏幕截图展现了本例的执行结果。

11.2 同步机制 - 图3

11.2.5 CyclicBarrier

该类允许将一些任务同步到某个共同点。所有的任务都在该点等待,直到任务全部到达该点为止。从内部来看,该类还管理了一个内部计数器,用于记录尚未到达该点的任务。当一个任务到达指定点时,它要执行await()方法以等待其他任务。当所有任务都到达时,CyclicBarrier对象将它们唤醒,这样就能够继续执行。

当所有的参与方都到达后,该类允许执行另一个任务。为了实现这一点,要在该对象的构造函数中指定一个Runnable对象。

例如,我们实现了下面的Runnable接口,它采用一个CyclicBarrier对象来等待其他任务。

  1. public class BarrierTask implements Runnable {
  2. private CyclicBarrier barrier;
  3. public BarrierTask(CyclicBarrier barrier) {
  4. this.barrier=barrier;
  5. }
  6. @Override
  7. public void run() {
  8. System.out.println(Thread.currentThread().getName()+": Phase 1");
  9. CommonTask.doTask();
  10. try {
  11. barrier.await();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. } catch (BrokenBarrierException e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println(Thread.currentThread().getName()+": Phase 2");
  18. }
  19. }

我们还实现了另一个Runnable对象,当所有的任务都执行了await()方法之后,它将被CyclicBarrier执行。

  1. public class FinishBarrierTask implements Runnable {
  2. @Override
  3. public void run() {
  4. System.out.println("FinishBarrierTask: All the tasks have finished");
  5. }
  6. }

最后,在main()方法中,在一个执行器中执行了10个任务。可以发现,CyclicBarrier对象是用我们要同步的任务数和FinishBarrierTask对象来初始化的。

  1. public static void main(String[] args) {
  2. CyclicBarrier barrier=new CyclicBarrier(10,new FinishBarrierTask());
  3. ThreadPoolExecutor executor=(ThreadPoolExecutor)
  4. Executors.newCachedThreadPool();
  5. for (int i=0; i<10; i++) {
  6. executor.execute(new BarrierTask(barrier));
  7. }
  8. executor.shutdown();
  9. try {
  10. executor.awaitTermination(1, TimeUnit.DAYS);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }

下面的屏幕截图展示了本例的执行结果。

11.2 同步机制 - 图4

可以看到,当所有的任务都到达调用await()方法的公共点时,将执行FinishBarrierTask,然后所有的任务都继续其执行过程。

11.2.6 CompletableFuture

这是在Java 8并发API中引入的一种同步机制,在Java 9中又有了一些新方法。它扩展了Future机制,为其赋予了更强的功能和更大的灵活性。它允许实现一个事件驱动的模型,链接那些只有当其他任务执行完毕后才执行的任务。与Future接口相同,CompletableFuture也必须采用操作要返回的结果类型进行参数化。和Future对象一样,CompletableFuture类表示的是异步计算的结果,只不过CompletableFuture的结果可以由任意线程确立。当计算正常结束时,该类采用complete()方法确定结果,而当计算出现异常时,则采用completeExceptionally()方法。如果两个或者多个线程调用同一CompletableFuturecomplete()方法或completeExceptionally()方法,那么只有第一个调用会起作用。

首先,可以使用构造函数创建CompletableFuture对象。在本例中,你需要使用前面介绍的complete()方法确定任务结果。不过,也可以使用runAsync()方法或者supplyAsync()创建一个任务结果。runAsync()方法执行一个Runnable对象并且返回CompletableFuture,这样计算就不能再返回任何结果了。supplyAsync()方法执行了Supplier接口的一个实现,它采用本次计算要返回的类型进行参数化。该Supplier接口提供了get()方法。在该方法中,需要包含任务代码并且返回任务生成的结果。在本例中,CompletableFuture的结果将作为Supplier接口的结果。

该类提供了大量方法,允许通过实现一个事件驱动的模型组织任务的执行顺序,一个任务只有在其之前的任务完成之后才会开始。这其中包括如下方法。

  • thenApplyAsync():该方法接收Function接口的一个实现(可以表示为一个lambda表达式)作为参数。该函数将在调用CompletableFuture完成后执行。该方法将返回CompletableFuture以获得Fuction的结果。
  • thenComposeAsync():该方法和thenApplyAsync()方法相似,但是当供给函数也返回CompletableFuture时很有用。
  • thenAcceptAsync():该方法和前一个方法相似,只不过其参数是Consumer接口的一个实现(也可以描述为一个lambda表达式);在这种情况下,计算不会返回结果。
  • thenRunAsync():该方法和前一个等价,只不过在这种情况下接收一个Runnable对象作为参数。
  • thenCombineAsync():该方法接收两个参数。第一个参数为另一个CompletableFuture实例,另一个参数是BiFunction接口的一个实现(可描述为一个lambda函数)。该BiFunction接口实现将在两个CompletableFuture(当前调用的和参数中的)都完成后执行。该方法将返回CompletableFuture以获取BiFunction的结果。
  • runAfterBothAsync():该方法接收两个参数。第一个参数为另一个CompletableFuture,而第二个参数为Runnable接口的一个实现,它将在两个CompletableFuture(当前调用的和参数中的)都完成后执行。
  • runAfterEitherAsync():该方法与前一个方法等价,只不过当其中一个CompletableFuture对象完成之后才会执行Runnable任务。
  • allOf():该方法接收CompletableFuture对象的一个变量列表作为参数。它将返回一个CompletableFuture对象,而该对象将在所有的CompletableFuture对象都完成之后返回其结果。
  • anyOf():该方法和前一个方法等价,只是返回的CompletableFuture对象会在其中一个CompletableFuture对象完成之后返回其结果。

最后,如果想要获取CompletableFuture返回的结果,可以使用get()方法或者join()方法。这两个方法都会阻塞调用线程,直到CompletableFuture完成之后返回其结果。这两个方法之间的主要区别在于,get()方法抛出ExecutionException(这是一个校验异常),而join()方法抛出RuntimeException(这是一个未校验异常)。因此,在不抛出异常的lambda(例如SupplierConsumerRunnable)内部,使用join()方法更为方便。

前面提到的大多数方法都有Async后缀。这意味着这些方法将使用ForkJoinPool.commonPool实例以并发方式执行。这些方法都有不带Async后缀的版本,它们将以串行方式执行(这就是说,与执行CompletableFuture的线程是同一个);还有带Async后缀并且以一个执行器实例作为额外参数的版本。这种情况下,CompletableFuture将在作为参数传递的执行器中以异步方式执行。

Java 9增加了一些方法,为CompletableFuture类赋予了更强的功能。

  • defaultExecutor():该方法用于返回并不接收Executor作为参数的那些异步操作的默认执行器。通常,它将是ForkJoinPool.commonPool()方法的返回值。
  • copy():该方法创建CompletableFuture对象的一个副本。如果原来的CompletableFuture正常完成,则副本方法也将正常完成并返回相同的值。如果原来的CompletableFuture异常完成,则副本方法也异常完成,并且抛出CompletionException异常。
  • completeAsync(): 该方法接收一个Supplier对象作为参数(还可以选择Executor)。借助Supplier的结果完成CompletableFuture
  • orTimeout():该方法接收一段时延(一段时间和一个TimeUnit)。如果CompletableFuture在这段时间之后没有完成,那么抛出TimeoutException异常并异常完成。
  • completeOnTimeout():该方法与上一个方法相似,只不过它在作为参数的值的范围内正常完成。
  • delayedExecutor():该方法返回一个Executor,该执行器在执行指定时延之后执行某一任务。

使用CompletableFuture

在本例中,你将学会如何使用CompletableFuture类以并发方式执行一些异步任务。我们将用由亚马逊的20 000个商品构成的集合实现下面的任务树。

11.2 同步机制 - 图5

首先要使用这些范例(商品)。然后执行四个并发任务。第一个任务是搜索商品。当搜索完成后,将结果写入一个文件。第二个任务是获得评价最高的商品。第三个任务是获得销量最佳的商品。当这两个任务完成之后,将使用另一个任务将它们的信息连接起来。最后,第四个任务是获取购买过商品的用户列表。main()方法将等待所有任务结束,然后输出结果。

下面看看实现过程的细节。

  • 辅助任务

在本例中,将用到一些辅助任务。第一个任务是LoadTask,用于从磁盘加载商品信息并且返回一个Product对象列表。

  1. public class LoadTask implements Supplier<List<Product>> {
  2. private Path path;
  3. public LoadTask (Path path) {
  4. this.path=path;
  5. }
  6. @Override
  7. public List<Product> get() {
  8. List<Product> productList=null;
  9. try {
  10. productList = Files.walk(path, FileVisitOption.FOLLOW_LINKS)
  11. .parallel().filter(f -> f.toString()
  12. .endsWith(".txt")).map(ProductLoader::load)
  13. .collect (Collectors.toList());
  14. } catch (IOException e) {
  15. e.printStackTrace();
  16. }
  17. return productList;
  18. }
  19. }

该任务实现了Supplier接口,将其作为CompletableFuture执行。从内部来看,它使用一个流来处理和解析所有包含商品列表的文件。

第二个任务是SearchTask,该任务将实现对Product对象列表的搜索,查找在名称中含有某一单词的对象。该任务是Function接口的一个实现。

  1. public class SearchTask implements Function<List<Product>,
  2. List<Product>> {
  3. private String query;
  4. public SearchTask(String query) {
  5. this.query=query;
  6. }
  7. @Override
  8. public List<Product> apply(List<Product> products) {
  9. System.out.println(new Date()+": CompletableTask: start");
  10. List<Product> ret = products.stream()
  11. .filter(product -> product.getTitle()
  12. .toLowerCase().contains(query))
  13. .collect(Collectors.toList());
  14. System.out.println(new Date()+": CompletableTask: end:
  15. "+ret.size());
  16. return ret;
  17. }
  18. }

它接收含有全部商品信息的List,返回一个含有满足标准的商品的List。从内部来看,它基于输入列表创建了流,对其进行筛选,并且将结果收集到另一个列表中。

最后,WriteTask将搜索任务中获得的商品写入一个File对象。在我们的例子中生成了一个HTML文件,不过也可以以想要的格式输出这一信息。该任务实现了Consumer接口,这样它的代码就必须采用如下形式。

  1. public class WriteTask implements Consumer<List<Product>> {
  2. @Override
  3. public void accept(List<Product> products) {
  4. // 实现部分省略
  5. }
  6. }
  • main()方法

我们在main()方法中对这些任务进行了组织。首先,使用CompletableFuture类的supplyAsync()方法执行LoadTask。在LoadTask开始之前将等待3秒,以展示delayExecutor()方法如何工作。

  1. public class CompletableMain {
  2. public static void main(String[] args) {
  3. Path file = Paths.get("data","category");
  4. System.out.println(new Date() + ": Main: Loading products
  5. after three seconds....");
  6. LoadTask loadTask = new LoadTask(file);
  7. CompletableFuture<List<Product>>loadFuture = CompletableFuture
  8. .supplyAsync(loadTask,CompletableFuture
  9. .delayedExecutor(3, TimeUnit.SECONDS));

然后,有了生成的CompletableFuture,就可以在加载任务完成之后使用thenApplyAsync()执行搜索任务。

  1. System.out.println(new Date() + ": Main: Then apply for
  2. search");
  3. CompletableFuture<List<Product>> completableSearch = loadFuture
  4. .thenApplyAsync(new SearchTask("love"));

一旦搜索任务完成,就要将执行结果输出到一个文件。由于该任务并不返回结果,我们使用thenAcceptAsync()方法。

  1. CompletableFuture<Void> completableWrite = completableSearch
  2. .thenAcceptAsync(new WriteTask());
  3. completableWrite.exceptionally(ex -> {
  4. System.out.println(new Date() + ": Main: Exception "
  5. + ex.getMessage());
  6. return null;
  7. });

如果写入任务抛出异常,那么使用exceptionally()方法指定要做的事项。

然后,在completableFuture对象上使用thenApplyAsync()方法执行该任务,以便获取购买某一商品的用户列表。将该任务描述为一个lambda表达式。请注意,该任务并不会与搜索任务并行执行。

  1. System.out.println(new Date() + ": Main: Then apply for users");
  2. CompletableFuture<List<String>> completableUsers = loadFuture
  3. .thenApplyAsync(resultList -> {
  4. System.out.println(new Date()+ ": Main: Completable users: start");
  5. List<String> users = resultList.stream()
  6. .flatMap(p -> p.getReviews().stream())
  7. .map(review -> review.getUser())
  8. .distinct()
  9. .collect(Collectors.toList());
  10. System.out.println(new Date() + ": Main: Completable users: end");
  11. return users;
  12. });

并行处理这些任务时,我们还使用thenApplyAsync()方法执行了该任务,以便查找评价最高的商品和销量最佳的商品。我们也使用一个lambda表达式定义了这些任务。

  1. System.out.println(new Date() + ": Main: Then apply for best
  2. rated product....");
  3. CompletableFuture<Product> completableProduct = loadFuture
  4. .thenApplyAsync(resultList -> {
  5. Product maxProduct = null;
  6. double maxScore = 0.0;
  7. System.out.println(new Date() + ": Main: Completable product:
  8. start");
  9. for (Product product : resultList) {
  10. if (!product.getReviews().isEmpty()) {
  11. double score = product.getReviews().stream()
  12. .mapToDouble(review -> review.getValue())
  13. .average().getAsDouble();
  14. if (score > maxScore) {
  15. maxProduct = product;
  16. maxScore = score;
  17. }
  18. }
  19. }
  20. System.out.println(new Date() + ": Main: Completable product : end");
  21. return maxProduct;
  22. });
  23. System.out.println(new Date() + ": Main: Then apply for best
  24. selling product....");
  25. CompletableFuture<Product> completableBestSellingProduct =
  26. loadFuture.thenApplyAsync(resultList -> {
  27. System.out.println(new Date() + ": Main: Completable best
  28. selling: start");
  29. Product bestProduct = resultList.stream()
  30. .min(Comparator.comparingLong
  31. (Product::getSalesrank))
  32. .orElse(null);
  33. System.out.println(new Date() + ": Main: Completable best
  34. selling: end");
  35. return bestProduct;
  36. });

如前所述,我们想将前两个任务的结果连到一起。可以使用thenCombineAsync()方法完成这一工作,用它指定一个将在这两个任务完成之后执行的任务。

  1. CompletableFuture<String> completableProductResult =
  2. completableBestSellingProduct
  3. .thenCombineAsync(
  4. completableProduct, (bestSellingProduct,
  5. bestRatedProduct) -> {
  6. System.out.println(new Date() + ": Main: Completable product
  7. result: start");
  8. String ret = "The best selling product is "
  9. + bestSellingProduct.getTitle() + "\n";
  10. ret += "The best rated product is "
  11. + bestRatedProduct.getTitle();
  12. System.out.println(new Date() + ": Main: Completable product
  13. result: end");
  14. return ret;
  15. });

最后,使用completeOnTimeout()方法预留1秒钟,以等待completableProductResult任务完成。如果它在1秒之内没有完成,那么完成CompletableFuture,并得出结果TimeOut。然后,使用allOf()方法和join()方法等待最终任务结束,并且输出使用get()方法获得的结果。

  1. System.out.println(new Date() + ": Main: Waiting for results");
  2. completableProductResult.completeOnTimeout("TimeOut", 1,
  3. TimeUnit.SECONDS);
  4. CompletableFuture<Void> finalCompletableFuture = CompletableFuture
  5. .allOf(completableProductResult, completableUsers,
  6. completableWrite);
  7. finalCompletableFuture.join();
  8. try {
  9. System.out.println("Number of loaded products: "
  10. + loadFuture.get().size());
  11. System.out.println("Number of found products: "
  12. + completableSearch.get().size());
  13. System.out.println("Number of users: "
  14. + completableUsers.get().size());
  15. System.out.println("Best rated product: "
  16. + completableProduct.get().getTitle());
  17. System.out.println("Best selling product: "
  18. + completableBestSellingProduct.get()
  19. .getTitle());
  20. System.out.println("Product result:
  21. "+completableProductResult.get());
  22. } catch (InterruptedException | ExecutionException e) {
  23. e.printStackTrace();
  24. }

在下面的屏幕截图中,可以看到本例的执行结果。

11.2 同步机制 - 图6

首先,main()方法执行所有配置并等待任务完成。这些任务按照配置的顺序执行。可以看到,LoadTask在三秒钟之后启动,以及completableProductResult返回字符串TimeOut,因为它在1秒钟之内还没有完成。