13.4 Scala的并发处理

Scala是一种通用的多范式编程语言,具有面向对象和函数式编程的特点。它的代码被编译成Java字节码。它提供了Java互操作性,因此你可以在Scala代码中使用Java元素(包括Java并发API),也可以在Java程序中使用用Scala编写的库。

正如我们在介绍Clojure和Groovy时提到的,本节的主要目的不是介绍Scala编程语言及其安装和配置。可通过The Scala Programming Language官网下载使用Scala工作所需的工具。你可以在IDE中安装插件以获得Scala支持环境。例如,Eclipse就有Scala IDE插件,可以通过Eclipse Marketplace进行安装。

Scala并发模型基于Future和Promise。Future存储了一个还不存在的值,该值将由一个异步任务来计算,而该任务将由另一个线程执行。Future使用一种非阻塞机制,并且当该值可用时(或者发生错误时)利用回调函数来处理该值。Promise是一种机制,它可以让你完成(给定一个值)Future。

ExecutionContext对象是Scala并发API中非常重要的一个元素。它负责执行应用程序中启动的Future对象。默认情况下,它由Java并发API的ForkJoinPool支持,不过你也可以创建一个不同的线程池。对于大多数需求而言,都可以使用默认的ExecutionContext,注意包含以下语句。

  1. import ExecutionContext.Implicits.global

在代码的import部分,必须要包含该语句。

13.4.1 Scala中的Future对象

正如前面提到的,Future存储的值还不存在,但是在将来某个时候可用。这个值将由一个异步任务来计算,而该任务将由另一个线程执行。大多数情况下,要在定义Future之时指定该任务,并且任务将按照计划在将来某一时刻开始执行。

Future不使用阻塞机制来获得结果。你可以将一个或多个回调函数关联在一起,当Future在其进程中有一个值或异常发生时再执行。

Future有两个可能的返回值。如果任务在没有错误的情况下结束执行并返回一个值,那么我们说Future已经成功完成并将执行成功回调函数。当一个Future抛出异常时,Future执行失败,并执行其故障回调函数。

创建Future最简单的方法是使用Future类的apply()方法。该方法创建并调度一个异步计算,该计算将执行apply()方法中指定的代码。该方法将返回Future对象。

我们可以将不同的Future回调函数关联起来处理其结果。这些回调函数有如下几种。

  • onComplete:该函数在Future结束执行时调用,无论是成功结束还是错误结束。在该函数的代码中,应该包含用于区分Future是否错误完成的代码。
  • onSuccess:该函数在Future成功执行完毕时调用。
  • onFailure:该函数在Future结束执行并抛出异常时调用。

让我们看一些在Scala中使用Future的例子。创建一个名为Task的类和一个名为doAction()的方法。该方法将接收一个String对象和一个Int值作为参数,并且返回一个String对象。在内部,它输出关于执行任务的线程的信息,将线程休眠参数中指定的秒数,并且返回一个String对象。

  1. class Task {
  2. def doAction(name : String, number: Int) : String = {
  3. var result : String = "";
  4. println(Thread.currentThread().getName()+": "+name+": Starting
  5. execution");
  6. TimeUnit.SECONDS.sleep(number);
  7. println(Thread.currentThread().getName()+": "+name+": End
  8. execution");
  9. result = name +" has been sleeping for " + number + " seconds ";
  10. return result;
  11. }
  12. }

现在,对Future类做一些测试。首先,为本例包含所有必需的类,并且使用main()方法创建一个名为TestConcurrency的对象。

  1. import scala.concurrent.ExecutionContext
  2. import java.util.concurrent.ThreadPoolExecutor
  3. import java.util.concurrent.Executors
  4. import scala.concurrent.Future
  5. import ExecutionContext.Implicits.global
  6. import scala.util.{Success, Failure}
  7. import java.util.concurrent.TimeUnit
  8. object TestConcurrency {
  9. def main(args: Array[String]) {

然后,使用Future类创建10个Future对象。每个Future将创建一个Task对象并且调用doAction()方法。

  1. for (i <- 1 to 10 ) {
  2. val result : Future[String] = Future {
  3. var task : Task = new Task();
  4. task.doAction("Task "+i,i);
  5. }

然后,将onComplete回调函数与结果Future对象相关联。如果Future以异常方式(出现故障)结束,我们将输出一条消息。否则,我们将输出Future所返回的值。

  1. result onComplete {
  2. case Success(value) => println(value)
  3. case Failure(e) => println("An error has occured: "
  4. +e.getMessage)
  5. }
  6. }
  7. TimeUnit.SECONDS.sleep(20)
  8. }
  9. }

下面的屏幕截图显示了执行本例后的输出。

13.4 Scala的并发处理 - 图1

现在,创建一个名为TestConcurrency2的类。该类与TestConcurrency类相似,但是也有一处重要区别。在本例中,我们使用两个不同的回调函数。当Future成功结束时,调用onSuccess()回调函数。当Future异常结束时,则调用onFailure()方法。

  1. import scala.concurrent.ExecutionContext
  2. import java.util.concurrent.ThreadPoolExecutor
  3. import java.util.concurrent.Executors
  4. import scala.concurrent.Future
  5. import ExecutionContext.Implicits.global
  6. import scala.util.{Success, Failure}
  7. import java.util.concurrent.TimeUnit
  8. object TestConcurrency2 {
  9. def main(args: Array[String]) {
  10. for (i <- 1 to 10 ) {
  11. val result : Future[String] = Future {
  12. var task : Task = new Task();
  13. task.doAction("Task "+i,i);
  14. }
  15. result onSuccess {
  16. case value => println(value);
  17. }
  18. result onFailure {
  19. case e => println("An error has ocurred: "+e.getMessage);
  20. }
  21. }
  22. TimeUnit.SECONDS.sleep(20)
  23. }
  24. }

现在我们将实现相同的版本,只不过使用我们的ExecutionContext类。创建一个名为TestConcurrency3的类。要创建ExecutionContext对象,需要用到ExecutionContext类的fromExecutor()方法。将这些方法传递给Executor对象,它将用于执行ExecutionContext的任务。使用Executor类的newFixedThreadPool()方法创建一个拥有10个执行线程的执行器。

  1. import scala.concurrent.ExecutionContext
  2. import java.util.concurrent.ThreadPoolExecutor
  3. import java.util.concurrent.Executors
  4. import scala.concurrent.Future
  5. import scala.util.{Success, Failure}
  6. import java.util.concurrent.TimeUnit
  7. object TestConcurrency3 {
  8. def main(args: Array[String]) {
  9. implicit val ec : ExecutionContext = ExecutionContext
  10. .fromExecutor(Executors.newFixedThreadPool(10));
  11. for (i <- 1 to 10 ) {
  12. val result : Future[String] = Future {
  13. var task : Task = new Task();
  14. task.doAction("Task "+i,i);
  15. }
  16. result onSuccess {
  17. case value => println(value);
  18. }
  19. result onFailure {
  20. case e => println("An error has ocurred: "+e.getMessage);
  21. }
  22. }
  23. TimeUnit.SECONDS.sleep(20)
  24. }
  25. }

现在,让我们做一个测试,看看当Future抛出异常时会发生什么。创建一个名为Task的类,并且添加一个名为doAction()的方法,该方法接收两个参数,即一个名为nameString对象和一个名为numberInt值。如果number等于3,则doAction()方法抛出异常。否则,使之与此前介绍的Task类具有相同的行为。

  1. class Task {
  2. def doAction(name : String, number: Int) : String = {
  3. var result : String = "";
  4. if (number == 3) {
  5. throw new Exception("Error");
  6. }
  7. println(Thread.currentThread().getName()+": "+name+": Starting
  8. execution");
  9. TimeUnit.SECONDS.sleep(number);
  10. println(Thread.currentThread().getName()+": "+name+": End
  11. exeuction");
  12. result = name +" has been sleeping for " + number + " seconds ";
  13. return result;
  14. }
  15. }

然后,我们创建TestConcurrency类,该类创建10个Future对象,并将其与onComplete()回调函数关联到一起。

  1. object TestConcurrency {
  2. def main(args: Array[String]) {
  3. // 含有错误的第一个例子
  4. for (i <- 1 to 10 ) {
  5. val result : Future[String] = Future {
  6. var task : Task = new Task();
  7. task.doAction("Task "+i,i);
  8. }
  9. result onComplete {
  10. case Success(value) => println(value)
  11. case Failure(e) => println("An error has occured: "
  12. +e.getMessage)
  13. }
  14. }
  15. TimeUnit.SECONDS.sleep(20)
  16. }
  17. }

下面的屏幕截图显示了执行本例后的输出。

13.4 Scala的并发处理 - 图2

doAction()方法采用参数3执行时,该方法抛出一个异常,与该Future相关联的回调函数执行onComplete()方法的Failure情况,并输出前面屏幕截图中的错误消息。

在前面的例子中,我们只为每个事件(不管成功还是失败)关联一个回调函数,不过,你可以为每个事件关联多个回调函数。让我们看一个例子。你可以使用前面的Task对象之一,但是让我们创建一个新的TestConcurrency类。我们将onComplete()onSuccess()回调函数关联到每个Future。你甚至可以将同一类型的多个回调函数(多个onComplete()onSuccess()onFailure()函数)关联到一个Future。

  1. object TestConcurrency {
  2. def main(args: Array[String]) {
  3. for (i <- 1 to 10 ) {
  4. val result : Future[String] = Future {
  5. var task : Task = new Task();
  6. task.doAction("Task "+i,i);
  7. }
  8. result onComplete {
  9. case Success(value) => println(value)
  10. case Failure(e) => println("An error has occured: "
  11. +e.getMessage)
  12. }
  13. result onSuccess {
  14. case value => println("Second callback: "+value);
  15. }
  16. }
  17. TimeUnit.SECONDS.sleep(20)
  18. }
  19. }

下面的屏幕截图显示了执行本例后的输出结果。

13.4 Scala的并发处理 - 图3

Future对象提供的另一个选项是将两个Future对象的执行关联起来;也就是说,可以确保一个Future在另一个Future执行结束后开始执行,并且使用后者的结果作为参数。来看一个使用该功能的例子。

首先,创建一个名为Step1的类,它含有一个名为doAction()的方法,该方法接收一个字符串和一个数值作为参数,并且返回一个字符串。

  1. class Step1 {
  2. def doAction(name : String, number: Int) : String = {
  3. var result : String = "";
  4. println(Thread.currentThread().getName()+": "+name+": Step 1:
  5. Starting execution");
  6. TimeUnit.SECONDS.sleep(number);
  7. println(Thread.currentThread().getName()+": "+name+": Step 1: End
  8. exeuction");
  9. result = name +" has been sleeping for " + number + " seconds ";
  10. return result;
  11. }
  12. }

然后,创建一个类似的名为Step2的类。

  1. class Step2 {
  2. def doAction(name: String, msg : String) : String = {
  3. var result : String = "";
  4. println(Thread.currentThread().getName()+": "+name+": Step 2:
  5. Starting execution");
  6. result = name +" has executed Step 2: "+msg;
  7. println(Thread.currentThread().getName()+": "+name+": Step 2: End
  8. exeuction");
  9. return result;
  10. }
  11. }

最后,创建一个名为TestConcurrency的对象以及main()方法,其中有一个将执行10次的循环。

  1. object TestConcurrency {
  2. def main(args: Array[String]) {
  3. for (i <- 1 to 10 ) {

然后,创建第一个Future,它将创建Step1类的一个对象,并且调用doAction()方法。

  1. var name : String = "Task "+i;
  2. val result : Future[String] = Future {
  3. var task : Step1 = new Step1();
  4. task.doAction(name,i);
  5. }

然后,使用map()函数将一个Future连接到结果Future对象。该Future创建Step2类的一个对象,并且调用doAction()方法。

  1. val result2 = result map { value =>
  2. var task : Step2 = new Step2();
  3. task.doAction(name, value);
  4. }

在该Future的主体中指定的value参数是第一个Future的结果。

最后,将onSuccess()回调函数与第二个Future关联,以便在控制台中输出结果。下面的屏幕截图显示了执行本例后的输出结果。

13.4 Scala的并发处理 - 图4

你可以看到,直有当第一个Future完成执行,第二个Future才会开始执行。

13.4.2 Promise

Promise是一种可以用来完成Future的机制。首先,创建Promise类的一个对象,然后使用该对象来创建该Promise要完成的Future。可以将回调函数与该Future关联起来,这样,当使用success或failure方法为Promise赋值时,Future将完成而且回调函数将被执行。

我们来看看这个机制是如何运作的。创建一个名为TestConcurrency的对象,其中含有main()方法,并且创建一个Promise对象和一个Future对象。

  1. object TestConcurrency {
  2. def main(args: Array[String]) {
  3. val promise : Promise[String] = Promise[String]()
  4. val future : Future[String] = promise.future;

使用Promise的构造函数创建Promise对象,并且使用Promise对象的future()方法创建与该Promise相关联的Future。

现在,将回调函数关联到对象future

  1. future onSuccess {
  2. case value => println("The future has been completed: "+value)
  3. }

此后,执行另一个Future来完成该Promise。在本例中,我们使用success()方法为Promise赋值并且完成该Future。

  1. Future {
  2. promise success "Hola Mundo";
  3. }

最后,使用Await类的ready()方法等待10秒钟,待Future结束。

  1. Await.ready(future, 10 seconds);
  2. }
  3. }

当你执行本例时,会看到onSuccess()函数输出的消息。当你执行Promise的success方法时,将完成Future并执行其onSuccess()回调函数。