6.3 创建Akka项目
Akka是一个模块化工具包,用于创建健壮的分布式应用程序。它使用本章后面将深入介绍的Actor模型,并大量地使用Scala的函数式编程功能。这个库很大,因此本章只能演示其中的很小一部分。
有关这方面的更详细的信息,请访问Akka网站(http://akka.io/)。
建议你在创建这个项目期间将Akka文档放在身边。有关Akka文档的更详细信息,请参阅http://akka.io/docs/。
本节介绍如下主题:
- 在SBT构建文件中添加Akka依赖项;
- 更新Scala IDE项目;
- Akka概念;
- 创建Actor;
- 创建消息;
- 使用ScalaTest库对Actor进行单元测试;
- 编写可运行的应用程序。
6.3.1 在SBT构建文件中添加Akka依赖项
在Akka主文档网站,找到有关sbt的部分:

在列表中找到工件akka-actor,并将相应的行复制到剪贴板:
"com.typesafe.akka" %% "akka-actor" % "2.4.16"
在Eclipse IDE中,打开Package Explorer中的文件build.sbt,并对其做如下修改。
- 在包含
libraryDependencies的那行末尾添加一个逗号。 - 添加一个空行,在其中输入
libraryDependencies +=,再粘贴刚才从Akka文档中复制的内容,并在行尾添加一个逗号。
在前述工件列表中,找到akka-testkit,并重复刚才介绍的过程。确保最后一行末尾没有逗号,再在这行末尾添加% Test。
执行这些修改后,我的构建文件类似于下面这样:
import Dependencies._lazy val root = (project in file(".")).settings(inThisBuild(List(organization := "com.example",scalaVersion := "2.11.8",version := "0.1.0-SNAPSHOT")),name := "Hello",libraryDependencies += scalaTest % Test,libraryDependencies += "com.typesafe.akka" %% "akka-actor"% "2.4.16",libraryDependencies += "com.typesafe.akka" %% "akka-testkit"% "2.4.16" % Test)
通过在libraryDependencies条目末尾添加% Test,可让SBT确保相应的依赖只用于单元测试。运行主程序时,这些依赖项不会添加到classpath中,也不会随项目一起分发。按Ctrl + S保存所做的修改。
6.3.2 更新Scala IDE项目
更新文件build.sbt后,必须运行SBT,让它下载依赖项并将其添加到classpath中。在这里,还需使用插件sbteclipse更新Eclipse IDE项目,否则Eclipse IDE将看不到所做的修改。
在命令行中,切换到目录akka-quotes,再执行如下命令,让SBT下载新增的依赖并更新Eclipse项目:
sbt eclipse
SBT将下载新增的依赖项(以及它们的依赖项),还将相应地更新Eclipse IDE项目。过一会儿,你将看到如下消息:
[info] Successfully created Eclipse project files for project(s):[info] Hello
在Eclipse IDE中,右击Project Explorer中的项目akka-quotes并选择Refresh。如果一切顺利,你将在Package Explorer的Referenced Libraries部分看到条目akka-actor。
6.3.3 Akka概念
编写使用Akka工具包的Scala代码前,先来介绍一些Akka概念。前面说过,Akka使用了Actor模型。要编写Akka代码,你必须了解一些背景信息。本节介绍如下Akka概念:
- Actor;
- Actor引用(
ActorRef); - 消息;
调度器。
Actor
在Actor模型中,不直接调用类的方法,而向Actor发送消息。在Akka中,Actor是扩展了特质akka.actor.Actor并包含消息处理程序的类实例。消息处理程序是一个方法,能够处理Actor支持的所有消息。Actor的消息处理程序可将收到的消息传递给其他Actor,也可创建一条或多条新消息并将其传递给其他Actor,还可创建新的Actor。
到目前为止,我们没看到Actor模型有任何特殊之处,也没有看到它相对于直接调用方法有任何优点。Akka的一个强大功能是,Actor不仅可运行在单个应用程序的单个进程中,还可运行在不同的线程和进程中(如不同的JVM实例中)。Actor甚至还可运行在网络上的不同计算机中。只要应用程序按规则行事,开发人员就无需纠缠于与并行/并发编程相关的经典问题,如加锁、避免竞态条件等,而这些问题常常难以重现和调试。
前面说过,Actor可创建自己的Actor。创建其他Actor的Actor是子Actor的父Actor,也被视为所有子Actor的监管Actor。如果子Actor失败,它将向监管Actor发送消息,而后者将负责处理问题。监管Actor可采取如下方式来处理错误:
- 请求子Actor接着往下执行任务,并保持其状态不变;
- 请求子Actor重新执行任务,并清除其状态;
- 永久性停止任务;
- 向上提交问题;此时任务将失败。
为创建本地Actor,最简单的方法是使用ActorSystem实例的工厂方法actorOf。为此,必须将akka.actor.Props类作为参数,它包含指向要创建的Actor所属类的引用。Props是一个向方法actorOf提供配置数据的类,如下例所示:
import akka.actor.{ ActorSystem, Actor, Props }val system = ActorSystem("AkkaQuote")class MyActor extends Actor {def receive: Actor.Receive = { Actor.emptyBehavior }}val myActorRef = system.actorOf(Props[MyActor], "My-Actor")
现在不用输入上述代码。
- Actor引用(
ActorRef)
通常情况下,Akka没有提供直接访问Actor实例的途径,而是在创建Actor时返回一个ActorRef实例,用于向相应的Actor发送消息。请注意,ActorRef实例没有暴露Actor的内部细节。ActorRef是一个不可修改的、线程安全的对象,可通过消息将其安全地传递给其他Actor。
Akka返回ActorRef实例而不是Actor类本身的实例的原因之一是,Actor可能运行在网络上的不同计算机中。通过使用ActorRef实例,可访问每个Actor,而不管它运行在应用程序所在的进程中,还是运行在远程服务器上。
Actor的消息处理程序能够访问变量self和sender,其中self包含指向当前Actor的ActorRef实例的引用,而sender包含指向发送消息的Actor的ActorRef实例的引用。
单元测试可使用特殊类
TestActorRef来访问Actor的内部结构,这将在本章后面演示。
- 消息
消息可以是任何包含必要数据的类的实例。强烈建议只在消息中包含不可修改的数据,因为如果Actor可随便修改消息的状态,就可能引发典型的多线程问题,这在前一节讨论过。相反,状态应在Actor中处理。Scala中的Case类非常适合用作消息,因为Actor的消息处理程序能够轻松地处理这种类的实例。
当前,Akka不能保证目标Actor一定能够收到发送的消息。只有Akka模块persistence(通过第2章简要讨论过的对象关系管理器库与数据库交互的Akka模块)能够确保目标Actor一定会收到消息。然而,可在接收端和/或发送端添加一个自定义层来处理传输错误。
发送给Actor的消息存储在一个队列中,这个队列被称为Actor的邮箱。Akka提供了多种内置的邮箱实现,Actor可根据性能要求选择不同的实现。另外,在必要的情况下,也可创建自定义的邮箱实现。Akka将确保Actor尽快处理其邮箱内的消息。
- 调度器
调度器是一个线程池,Akka使用它来执行各种管理任务。它们确保发送的消息将放到目标Actor的邮箱中、在邮箱中等待的消息将被Actor处理、Actor请求的回调将被调用等。
Akka提供了默认的调度器实现,但你可以选择其他实现,还可自己实现调度器。
6.3.4 创建第一个Akka Actor——QuotesHandlerActor
我们将编写一个简单的Akka程序,它在内存中存储一个名言列表。一个Actor负责管理这个列表(添加引言以及请求随机引言),而另一个Actor将请求一个随机的引言并将其打印到控制台。
首先,将目录main和test中既有的文件都删除。为此,在Eclipse的Package Explorer中,右击目录src/main/scala中的example包,并选择Delete。在被问及你是否确定要这样做时,单击OK按钮。
对目录src/test/scala中的example包做同样的处理。我们将重打锣鼓新开张。
接下来,新建一个名为QuotesHandlerActor的类,并将其放在akkaquote.actor包中。为此,右击Package Explorer中的目录src/main/scala,并选择New>Scala Class。
输入类名akkaquote.actor.QuotesHandlerActor:

Scala IDE将创建指定的类和包,它生成的代码如下(删除了一些空行):
package akkaquote.actorclass QuotesHandlerActor {}
将光标放在QuotesHandlerActor后面,添加一个空格并输入extends Actor,再按Ctrl + 空格键。Scala IDE将显示一个列表,其中包含推荐的类名:

选择akka.actor包中的Actor类,并按回车。Scala IDE将自动添加相应的import语句。
这个类将用于存储引言,因此我们来实现一个可修改的列表。在这个类中,输入 val quotes = new ListB 并按Ctrl+空格键。将鼠标指向scala.collections.mutable包中的ListBuffer,并按回车:

这行代码将变成val quotes = new ListBuffer。现在,整个类的代码应类似于下面这样:
package akkaquote.actorimport akka.actor.Actorimport scala.collection.mutable.ListBufferclass QuotesHandlerActor extends Actor {val quotes = new ListBuffer}
现在添加两个空行,并按Ctrl + 1(在macOS上为cmd + 1),Scala IDE将显示一个可实现的方法列表。选择方法receive并按回车:

Scala IDE将为你编写如下方法签名:
def receive: Actor.Receive = {???}
这是负责对收到的消息进行处理的方法。当前,还没有可处理的消息,但Akka要求你必须返回一个有效的对象。将???替换为Actor.emptyBehavior,这告诉Akka,这个方法没有任何行为,这是有意为之的。
要处理消息,得先编写它们。下面就来编写。
6.3.5 创建消息
前面说过,消息可以是任何实例,它们所属的类无需扩展任何特质或基类,但强烈建议你使用Case类。这是因为Actor只有一个receive()方法,所有的消息都将发送给它。稍后你将看到,使用Case类的模式匹配功能来处理消息非常方便。
请创建一个新类,将其命名为Messages,并放在akkaquote.message包中。为节省空间,我们将在这个文件中定义所有的公有消息类。不同于Java,Scala允许在同一个源代码文件中包含多个公有类。在这个文件中,将生成的整个Messages类删除。我们首先来编写import语句以及定义引言的类:
package akkaquote.messageimport akka.actor.ActorRefclass Quote(val quote: String, val author: String)
Quote类定义了由文本和作者组成的引言,这两个变量都是不可修改的。接下来,新增一行并添加一些Case类,它们定义了这个示例中的Actor将使用的消息。为此,在这个源代码文件末尾添加如下代码:
case class AddQuote(quote: Quote)case class RequestQuote(originalSender: ActorRef)case object PrintRandomQuotecase object QuoteAddedcase class QuoteRequested(quote: Quote, originalSender: ActorRef)case object QuotePrinted
前两个类(AddQuote和RequestQuote)是可发送给前面定义的Actor QuotesHandlerActor的消息。第三个对象(PrintRandomQuote)是可发送给后面将创建的ActorQuotePrinterActor的消息。其他三个对象是可作为应答发回的消息。对于前述代码,需要注意的一些细节如下。
- 这些代码非常紧凑。在Scala中,主构造函数是在类定义中定义的,因此无需编写定义字段和存储构造函数参数的代码,这些工作由Scala负责。
- 这些类和对象都无需对字段做额外处理,因为都不需要类体。
- 不需要参数的消息被定义为单例对象。并非必须这样做,但创建这些消息的多个实例只会浪费内存。
- 单例对象也可以是
Case类。 - 消息
RequestQuote和QuoteRequested的构造函数都将一个ActorRef实例(Actor引用)作为参数,其中的原因将在后面阐述。
还有一件事情要做——告诉QuotesHandlerActor只存储Quote实例。我们将使用泛型来完成这项工作。打开QuotesHandlerActor类并找到如下代码行:
val quotes = new ListBuffer
将其改成下面这样:
val quotes = new ListBuffer[Quote]()
请使用组合键Ctrl + 空格来输入类名Quotes,这样将自动添加所需的语句import akkaquote.message。
6.3.6 编写基于ScalaTest的单元测试
为演示单元测试库ScalaTest的工作原理,我们将编写一个对Actor进行测试的单元测试。为添加ScalaTest单元测试用例,请右击src/test/scala并选择New>Scala Class,再将这个类命名为QuotesHandlerActorTests,然后删除生成的代码,并开始编写必要的import语句以及类定义本身:
import akka.actor.{ActorSystem, Props}import akka.testkit.{TestKit, ImplicitSender, TestActorRef}import org.scalatest.{Matchers, FlatSpecLike, BeforeAndAfterAll}import akkaquote.actor.QuotesHandlerActorimport akkaquote.message.{AddQuote, Quote, QuoteAdded}class QuotesHandlerActorTests()extends TestKit(ActorSystem("Tests"))with ImplicitSender with Matcherswith FlatSpecLike with BeforeAndAfterAll {}
这个类扩展了Akka模块TestKit中的TestKit类,后者提供了大量让你能够更轻松地对Akka Actor进行测试的方法。通过实现特质FlatSpecLike和Matchers,可使用ScalaTest的DSL以更自然的方式编写单元测试。实现特质ImplicitSender确保这个类将收到Actor以应答方式发送的消息。
所有测试都结束后,妥善地停止Akka系统至关重要,否则将导致内存泄露。在这个类中,添加如下方法:
override def afterAll(): Unit = {system.terminate()}
之所以可以重写方法afterAll(),是因为这个类实现了特质BeforeAndAfterAll。字段system是从TestKit类那里继承而来的。
在我们的测试中,将测试我们能够通过向Actor QuotesHandlerActor发送AddQuote消息来添加新引言;为此,在这个类中添加如下代码。注意,我们使用了ScalaTest的领域特定语言(domain-specific language,DSL)。乍一看,这些代码可能有点怪异:
"An QuotesHandlerActor" should "add new quotes" in {val quoteHandlerActorRef = TestActorRef(Props[QuotesHandlerActor])val actorInstance = quoteHandlerActorRef.underlyingActor.asInstanceOf[QuotesHandlerActor]actorInstance.quotes.size should be(0)val quote = new Quote("This is a test", "me")quoteHandlerActorRef ! AddQuote(quote)expectMsg(QuoteAdded)actorInstance.quotes.size should be(1)actorInstance.quotes(0).quote should be("This is a test")actorInstance.quotes(0).author should be("me")}
前述代码执行了很多任务,下面来详细说说。
- 第一行定义了一个测试,其中的第一个字符串(
A QuotesHandlerActor)指出了要测试的类或对象,该字符串后面是关键字should,它告诉ScalaTest这是一个测试。接下来的字符串指出了测试的是什么,这里为“收到消息AddQuote时添加一个新引言”。在关键字in的后面是一个代码块,其中包含测试的代码。 - 我们创建了一个
TestActorRef对象,它指向QuotesHandlerActor。TestActorRef是一个可用于单元测试的ActorRef实例,其功能之一是让你能够直接访问它包装的Actor实例QuotesHandlerActor。 - 我们通过这个
TestActorRef对象获取指向QuotesHandlerActor实例的引用。有了指向Actor对象的引用后,就可以测试其内部功能了。 - 通过使用Actor引用
actorInstance,我们检查其集合成员quotes的长度是否为零。如果不是,这个测试将失败。 - 我们向
TestActorRef实例发送消息AddQuote,该实例将把消息转发给实际的Actor。我们之所以这样做,是因为只有ActorRef类实现了运算符!。变量actorInstance指向一个Actor类实例,而TestActorRef是ActorRef类的一个实例。我们创建一条新引言,并将其作为消息的参数。 - 由于我们实现了特质
ImplicitSender,因此这个类将收到消息AddQuote发回的应答。方法expectMsg检查是否收到了指定的消息,其默认超时时间为3秒,就这里而言,这足够了。 - 接下来,我们检查
actorInstance中的列表quotes是否包含一个元素,并检查属性quote和author是否与我们发送给消息的引言匹配。
在有些Scala IDE版本中,存在一个与方法
expectMsg相关的bug,即在这个方法可用的情况下显示错误消息not found: value expectMsg。如果你遇到这种错误,可置之不理。这样Scala IDE也将正确地编译并运行代码。
说得差不多了,下面来运行这个单元测试:在Package Explorer中右击QuotesHandler-ActorTests类并选择Run As>ScalaTest-File。测试以失败告终,这没有什么可奇怪的。控制台包含的信息类似于下面这样:
Run starting. Expected test count is: 1QuotesHandlerActorTests:An QuotesHandlerActor- should add new quotes *** FAILED ***java.lang.AssertionError: assertion failed: timeout (3 seconds) duringexpectMsg while waiting for QuoteAdded
请切换到选项卡ScalaTest,以查看摘要:

我们还没有处理消息AddQuote,因此单元测试脚本没有在合理的时间内收到应答消息QuoteAdded。下面来改变这种现状。
6.3.7 实现消息处理程序
在Package Explorer中,打开akkaquote.actor包中的文件QuotesHandlerActor.scala,并在其中添加如下必不可少的import语句:
package akkaquote.actorimport akka.actor.Actorimport scala.collection.mutable.ListBufferimport scala.util.Randomimport akkaquote.message.{ Quote, AddQuote, QuoteAdded, RequestQuote,QuoteRequested }
将方法receive的当前实现替换为如下代码:
def receive = {case AddQuote(quote) => {quotes += quotesender ! QuoteAdded}case RequestQuote(originalSender) => {val index = Random.nextInt(quotes.size)sender ! QuoteRequested(quotes(index), originalSender)}}
拜神奇的Case类和模式匹配所赐,我们能够轻松地检查收到的是哪种消息,并做相应的处理。收到消息AddQuote时,将传递的对象quote加入列表quotes,并发回应答消息QuoteAdded。
这个Actor还处理消息RequestQuote。收到这种消息时,它从列表中随机选择一条引言,并将其传递给应答消息QuoteRequested。该应答消息将发回给发送消息的Actor,其中包含ActorRef实例originalSender,让消息接收方能够将该应答消息发回给请求引言的Actor,后面将更详细地解释这一点。
请打开目录src/test/scala中的文件QuotesHandlerActorTests.scala,右击这个文件并选择Run As>ScalaTest-File,以再次运行测试。这次测试应该通过了。祝贺你成功了!

6.3.8 创建QuotePrinterActor
下面来再来创建一个Actor,它向QuotesHandlerActor请求一条引言,并将返回的引言打印到控制台。这个Actor的工作原理如下:收到消息RequestQuote后,将其发送给QuotesHandlerActor。正如你在前面看到的,QuotesHandlerActor将发回应答消息QuoteRequested,其中包含一条随机的引言。收到这条消息后,我们要创建的Actor将把它打印到屏幕上。为节省篇幅,这里不对这个类做单元测试。
注意,在生产环境中,出于时间考虑而省却单元测试通常是非常糟糕的主意。
请新建一个类,并将其命名为akkaquote.actor.QuotePrinterActor。接下来,首先添加import语句和类体:
package akkaquote.actorimport akka.actor.{ Actor, ActorRef }import akkaquote.message.{ PrintRandomQuote, RequestQuote,QuoteRequested, QuotePrinted }class QuotePrinterActor(val quoteManagerActorRef: ActorRef) extendsActor {}
需要指出的是,这个类的主构造函数将一个ActorRef实例作为参数。通过使用ActorRef 对象,可使用运算符!向这个Actor发送消息。
在这个类中,添加消息处理程序:
def receive: Actor.Receive = {case PrintRandomQuote => {val originalSender = senderquoteManagerActorRef ! RequestQuote(originalSender)}case QuoteRequested(quote, originalSender) => {System.out.println('"' + quote.quote + '"')System.out.println("-- " + quote.author)originalSender ! QuotePrinted}}
收到PrintRandomQuote消息后,QuotePrinterActor向QuoteManagerActor发送RequestQuote消息,这是使用通过构造函数传入的Actor引用实例quoteManagerActorRef实现的。QuoteManagerActor将发回应答消息QuoteRequested,其中包含一条随机的引言。
收到QuoteRequested消息后,QuotePrinterActor将其打印到控制台。
在RequestQuote消息中添加了发送者,这看起来好像很复杂。下面来详细说说。收到PrintRandomQuote消息时,发送者是将该消息发送给QuotePrinterActor的Actor;此时不知道该Actor是谁,我们暂且称之为Actor X。下面的UML时序图说明了这个流程:

在RequestQuote消息中,以originalSender的方式添加了指向Actor X的引用。在发回给QuotePrinterActor的应答消息QuoteRequested中,QuoteManagerActor原封不动地传递了引用originalSender。将引言打印到控制台后,处理程序QuoteRequested可以向originalSender(Actor X)发送消息QuotePrint,让它知道向控制台打印了一条消息。如果使用Actor引用Sender而不是originalSender,QuotePrinted消息将发回给QuoteManagerActor,这并不是我们希望的。
6.3.9 主应用程序
下面来创建一个基于控制台的简单应用程序,它在集合中添加一些引言,并随机地打印一条。这个应用程序将通过向Actor发送消息来通信。在Actor中,消息处理程序不能是阻塞的,这意味着它不应在同一个线程中运行耗时的代码。我们的Actor符合这条规则,它们都直接处理消息并立即返回。
为了演示这两个Actor的工作原理,我们将在主程序中采取不同的做法:向一个Actor发送消息,并等到收到应答后再继续执行。这样做旨在确保添加所有引言后才请求打印引言。另外,我们还想在打印引言后再停止Akka系统和程序。
为此可使用ask模式。这种模式向Actor发送消息后立即返回,而不等待应答。它返回一个名为Future的对象的值;调度器可在独立的线程中运行Future对象,而不阻塞Akka应用程序。另外,我们也可等待Future对象中的代码运行完毕(暂停程序),直到收到应答或超时,这个示例将采取这种做法。
右击akkaquote包并选择New>Scala Object,以添加一个单例对象。将代码修改成下面这样:
package akkaquoteimport akka.actor.{ ActorSystem, Props, ActorRef }import akka.pattern.askimport akka.util.Timeoutimport scala.concurrent.Awaitimport scala.concurrent.duration.DurationIntimport scala.language.postfixOpsimport akkaquote.actor.{ QuotePrinterActor, QuotesHandlerActor }import akkaquote.message.{ Quote, AddQuote, RequestQuote,PrintRandomQuote }object Main extends App {}
这里包含的import语句很多。对Akka Actor来说,akka.actor包中的类至关重要。akka.pattern.ask包也必须导入,否则将无法识别方法ask或类似的运算符?。方法ask是一个开销很大的命令,Akka设计者希望程序员明白这一点。要等待Future对象执行完毕,Await类必不可少。为使用后缀表示法设置超时时间,必须导入Timeout、DurationInt和postfixOps类。
首先来初始化Akka和Actor,为此在这个对象中添加如下代码:
val system = ActorSystem("AkkaQuote")val quoteActorRef = system.actorOf(Props[QuotesHandlerActor],"quotesActor")val quotePrinterActorRef = system.actorOf(Props(newQuotePrinterActor(quoteActorRef)),"quotesPrinterActor")
接下来,添加初始化超时时间以及在QuotesHandlerActor中添加3条引言的代码:
implicit val timeout = Timeout(10 seconds)val future1 = quoteActorRef ? AddQuote(new Quote("Hello world","Various book authors"))val future2 = quoteActorRef ? AddQuote(new Quote("To be or not to be","W. Shakespeare"))val future3 = quoteActorRef ? AddQuote(new Quote("In the middle of difficulty lies opportunity","A. Einstein"))Await.result(future1, timeout.duration)Await.result(future2, timeout.duration)Await.result(future3, timeout.duration)
使用运算符?时(前面说过,必须导入akka.pattern.ask包,否则无法使用运算符?),将返回一个Future对象。对于每条消息,我们都等待应答或超时;如果超时,将引发异常。在这个小型应用程序中添加最后一段代码:
val future4 = quotePrinterActorRef ? PrintRandomQuoteAwait.result(future4, timeout.duration)system.terminate()
我们使用指向QuotePrinterActor的ActorRef实例向它发送PrintRandomQuote消息。同样,我们等待应答消息或超时。最后,调用方法terminate()来停止Akka系统。
有些Scala IDE版本存在bug,无法识别方法
system.terminate(),进而显示错误消息“value terminate is not a member of akka.actor.ActorSystem”,但这些代码能够通过编译并正确地运行。
这个程序将优雅地退出并妥善地释放所有的资源。
如果你按Ctrl + F11运行这个应用程序,将看到它向控制台打印了一条引言:

