13.4 Scala的并发处理
Scala是一种通用的多范式编程语言,具有面向对象和函数式编程的特点。它的代码被编译成Java字节码。它提供了Java互操作性,因此你可以在Scala代码中使用Java元素(包括Java并发API),也可以在Java程序中使用用Scala编写的库。
正如我们在介绍Clojure和Groovy时提到的,本节的主要目的不是介绍Scala编程语言及其安装和配置。可通过The Scala Programming Language官网下载使用Scala工作所需的工具。你可以在IDE中安装插件以获得Scala支持环境。例如,Eclipse就有Scala IDE插件,可以通过Eclipse Marketplace进行安装。
Scala并发模型基于Future和Promise。Future存储了一个还不存在的值,该值将由一个异步任务来计算,而该任务将由另一个线程执行。Future使用一种非阻塞机制,并且当该值可用时(或者发生错误时)利用回调函数来处理该值。Promise是一种机制,它可以让你完成(给定一个值)Future。
ExecutionContext对象是Scala并发API中非常重要的一个元素。它负责执行应用程序中启动的Future对象。默认情况下,它由Java并发API的ForkJoinPool支持,不过你也可以创建一个不同的线程池。对于大多数需求而言,都可以使用默认的ExecutionContext,注意包含以下语句。
import ExecutionContext.Implicits.global
在代码的import部分,必须要包含该语句。
13.4.1 Scala中的Future对象
正如前面提到的,Future存储的值还不存在,但是在将来某个时候可用。这个值将由一个异步任务来计算,而该任务将由另一个线程执行。大多数情况下,要在定义Future之时指定该任务,并且任务将按照计划在将来某一时刻开始执行。
Future不使用阻塞机制来获得结果。你可以将一个或多个回调函数关联在一起,当Future在其进程中有一个值或异常发生时再执行。
Future有两个可能的返回值。如果任务在没有错误的情况下结束执行并返回一个值,那么我们说Future已经成功完成并将执行成功回调函数。当一个Future抛出异常时,Future执行失败,并执行其故障回调函数。
创建Future最简单的方法是使用Future类的apply()方法。该方法创建并调度一个异步计算,该计算将执行apply()方法中指定的代码。该方法将返回Future对象。
我们可以将不同的Future回调函数关联起来处理其结果。这些回调函数有如下几种。
onComplete:该函数在Future结束执行时调用,无论是成功结束还是错误结束。在该函数的代码中,应该包含用于区分Future是否错误完成的代码。onSuccess:该函数在Future成功执行完毕时调用。onFailure:该函数在Future结束执行并抛出异常时调用。
让我们看一些在Scala中使用Future的例子。创建一个名为Task的类和一个名为doAction()的方法。该方法将接收一个String对象和一个Int值作为参数,并且返回一个String对象。在内部,它输出关于执行任务的线程的信息,将线程休眠参数中指定的秒数,并且返回一个String对象。
class Task {def doAction(name : String, number: Int) : String = {var result : String = "";println(Thread.currentThread().getName()+": "+name+": Startingexecution");TimeUnit.SECONDS.sleep(number);println(Thread.currentThread().getName()+": "+name+": Endexecution");result = name +" has been sleeping for " + number + " seconds ";return result;}}
现在,对Future类做一些测试。首先,为本例包含所有必需的类,并且使用main()方法创建一个名为TestConcurrency的对象。
import scala.concurrent.ExecutionContextimport java.util.concurrent.ThreadPoolExecutorimport java.util.concurrent.Executorsimport scala.concurrent.Futureimport ExecutionContext.Implicits.globalimport scala.util.{Success, Failure}import java.util.concurrent.TimeUnitobject TestConcurrency {def main(args: Array[String]) {
然后,使用Future类创建10个Future对象。每个Future将创建一个Task对象并且调用doAction()方法。
for (i <- 1 to 10 ) {val result : Future[String] = Future {var task : Task = new Task();task.doAction("Task "+i,i);}
然后,将onComplete回调函数与结果Future对象相关联。如果Future以异常方式(出现故障)结束,我们将输出一条消息。否则,我们将输出Future所返回的值。
result onComplete {case Success(value) => println(value)case Failure(e) => println("An error has occured: "+e.getMessage)}}TimeUnit.SECONDS.sleep(20)}}
下面的屏幕截图显示了执行本例后的输出。

现在,创建一个名为TestConcurrency2的类。该类与TestConcurrency类相似,但是也有一处重要区别。在本例中,我们使用两个不同的回调函数。当Future成功结束时,调用onSuccess()回调函数。当Future异常结束时,则调用onFailure()方法。
import scala.concurrent.ExecutionContextimport java.util.concurrent.ThreadPoolExecutorimport java.util.concurrent.Executorsimport scala.concurrent.Futureimport ExecutionContext.Implicits.globalimport scala.util.{Success, Failure}import java.util.concurrent.TimeUnitobject TestConcurrency2 {def main(args: Array[String]) {for (i <- 1 to 10 ) {val result : Future[String] = Future {var task : Task = new Task();task.doAction("Task "+i,i);}result onSuccess {case value => println(value);}result onFailure {case e => println("An error has ocurred: "+e.getMessage);}}TimeUnit.SECONDS.sleep(20)}}
现在我们将实现相同的版本,只不过使用我们的ExecutionContext类。创建一个名为TestConcurrency3的类。要创建ExecutionContext对象,需要用到ExecutionContext类的fromExecutor()方法。将这些方法传递给Executor对象,它将用于执行ExecutionContext的任务。使用Executor类的newFixedThreadPool()方法创建一个拥有10个执行线程的执行器。
import scala.concurrent.ExecutionContextimport java.util.concurrent.ThreadPoolExecutorimport java.util.concurrent.Executorsimport scala.concurrent.Futureimport scala.util.{Success, Failure}import java.util.concurrent.TimeUnitobject TestConcurrency3 {def main(args: Array[String]) {implicit val ec : ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10));for (i <- 1 to 10 ) {val result : Future[String] = Future {var task : Task = new Task();task.doAction("Task "+i,i);}result onSuccess {case value => println(value);}result onFailure {case e => println("An error has ocurred: "+e.getMessage);}}TimeUnit.SECONDS.sleep(20)}}
现在,让我们做一个测试,看看当Future抛出异常时会发生什么。创建一个名为Task的类,并且添加一个名为doAction()的方法,该方法接收两个参数,即一个名为name的String对象和一个名为number的Int值。如果number等于3,则doAction()方法抛出异常。否则,使之与此前介绍的Task类具有相同的行为。
class Task {def doAction(name : String, number: Int) : String = {var result : String = "";if (number == 3) {throw new Exception("Error");}println(Thread.currentThread().getName()+": "+name+": Startingexecution");TimeUnit.SECONDS.sleep(number);println(Thread.currentThread().getName()+": "+name+": Endexeuction");result = name +" has been sleeping for " + number + " seconds ";return result;}}
然后,我们创建TestConcurrency类,该类创建10个Future对象,并将其与onComplete()回调函数关联到一起。
object TestConcurrency {def main(args: Array[String]) {// 含有错误的第一个例子for (i <- 1 to 10 ) {val result : Future[String] = Future {var task : Task = new Task();task.doAction("Task "+i,i);}result onComplete {case Success(value) => println(value)case Failure(e) => println("An error has occured: "+e.getMessage)}}TimeUnit.SECONDS.sleep(20)}}
下面的屏幕截图显示了执行本例后的输出。

当doAction()方法采用参数3执行时,该方法抛出一个异常,与该Future相关联的回调函数执行onComplete()方法的Failure情况,并输出前面屏幕截图中的错误消息。
在前面的例子中,我们只为每个事件(不管成功还是失败)关联一个回调函数,不过,你可以为每个事件关联多个回调函数。让我们看一个例子。你可以使用前面的Task对象之一,但是让我们创建一个新的TestConcurrency类。我们将onComplete()和onSuccess()回调函数关联到每个Future。你甚至可以将同一类型的多个回调函数(多个onComplete()、onSuccess()或onFailure()函数)关联到一个Future。
object TestConcurrency {def main(args: Array[String]) {for (i <- 1 to 10 ) {val result : Future[String] = Future {var task : Task = new Task();task.doAction("Task "+i,i);}result onComplete {case Success(value) => println(value)case Failure(e) => println("An error has occured: "+e.getMessage)}result onSuccess {case value => println("Second callback: "+value);}}TimeUnit.SECONDS.sleep(20)}}
下面的屏幕截图显示了执行本例后的输出结果。

Future对象提供的另一个选项是将两个Future对象的执行关联起来;也就是说,可以确保一个Future在另一个Future执行结束后开始执行,并且使用后者的结果作为参数。来看一个使用该功能的例子。
首先,创建一个名为Step1的类,它含有一个名为doAction()的方法,该方法接收一个字符串和一个数值作为参数,并且返回一个字符串。
class Step1 {def doAction(name : String, number: Int) : String = {var result : String = "";println(Thread.currentThread().getName()+": "+name+": Step 1:Starting execution");TimeUnit.SECONDS.sleep(number);println(Thread.currentThread().getName()+": "+name+": Step 1: Endexeuction");result = name +" has been sleeping for " + number + " seconds ";return result;}}
然后,创建一个类似的名为Step2的类。
class Step2 {def doAction(name: String, msg : String) : String = {var result : String = "";println(Thread.currentThread().getName()+": "+name+": Step 2:Starting execution");result = name +" has executed Step 2: "+msg;println(Thread.currentThread().getName()+": "+name+": Step 2: Endexeuction");return result;}}
最后,创建一个名为TestConcurrency的对象以及main()方法,其中有一个将执行10次的循环。
object TestConcurrency {def main(args: Array[String]) {for (i <- 1 to 10 ) {
然后,创建第一个Future,它将创建Step1类的一个对象,并且调用doAction()方法。
var name : String = "Task "+i;val result : Future[String] = Future {var task : Step1 = new Step1();task.doAction(name,i);}
然后,使用map()函数将一个Future连接到结果Future对象。该Future创建Step2类的一个对象,并且调用doAction()方法。
val result2 = result map { value =>var task : Step2 = new Step2();task.doAction(name, value);}
在该Future的主体中指定的value参数是第一个Future的结果。
最后,将onSuccess()回调函数与第二个Future关联,以便在控制台中输出结果。下面的屏幕截图显示了执行本例后的输出结果。

你可以看到,直有当第一个Future完成执行,第二个Future才会开始执行。
13.4.2 Promise
Promise是一种可以用来完成Future的机制。首先,创建Promise类的一个对象,然后使用该对象来创建该Promise要完成的Future。可以将回调函数与该Future关联起来,这样,当使用success或failure方法为Promise赋值时,Future将完成而且回调函数将被执行。
我们来看看这个机制是如何运作的。创建一个名为TestConcurrency的对象,其中含有main()方法,并且创建一个Promise对象和一个Future对象。
object TestConcurrency {def main(args: Array[String]) {val promise : Promise[String] = Promise[String]()val future : Future[String] = promise.future;
使用Promise的构造函数创建Promise对象,并且使用Promise对象的future()方法创建与该Promise相关联的Future。
现在,将回调函数关联到对象future。
future onSuccess {case value => println("The future has been completed: "+value)}
此后,执行另一个Future来完成该Promise。在本例中,我们使用success()方法为Promise赋值并且完成该Future。
Future {promise success "Hola Mundo";}
最后,使用Await类的ready()方法等待10秒钟,待Future结束。
Await.ready(future, 10 seconds);}}
当你执行本例时,会看到onSuccess()函数输出的消息。当你执行Promise的success方法时,将完成Future并执行其onSuccess()回调函数。
