13.3 软件事务性内存

软件事务性内存是一种机制,它为程序员在内存中访问数据提供了事务性语义。本节,你将学习如何在Groovy中应用这些元素。尽管我们并没有介绍Groovy编程语言,但是你可以通过互联网查找到很多关于Groovy编程语言的教程。在GPars的主页可以下载该库并查找有关如何使用它们的文档。正如前面提到的,你还可以在Java编程语言中使用该库。

13.3.1 使用Java元素

Groovy是一种针对JVM生成字节码的编程语言。你可以在Groovy程序中使用Java编程语言的所有元素,包括与并发处理相关的所有元素。

例如,在下面的例子中,你将创建一个线程。首先,使用main()方法声明一个名为Example1的Groovy类。

  1. class Example1 {
  2. static main(args) {

然后,使用Thread类的start()方法创建并执行一个线程。你可以指定要由该线程执行的代码。在本例中,我们将显示当前日期,休眠当前线程1秒钟,然后再次写入当前日期。

  1. def task = Thread.start {
  2. println Thread.currentThread().getName()+": Starting the thread:
  3. "+new Date();
  4. Thread.currentThread().sleep(1000);
  5. println Thread.currentThread().getName()+": Ending the thread:
  6. "+new Date();
  7. }

可以使用join()方法来等待该线程结束。

  1. task.join();
  2. println Thread.currentThread().getName()+": Main has ended: "
  3. +new Date();
  4. }
  5. }

当执行该应用程序时,你将看到该线程显示了第一个消息,并且在一秒钟后显示第二个消息。然后,当它完成执行时,main()方法显示了它的消息。

13.3.2 数据并行处理

在本节中,我们将采用Groovy编程语言提供的所有元素以并发方式处理数据结构。我们要考虑的第一个元素是GParsPool类。这个类是基于Fork/Join框架的JSR-166y的实现,它在以并发方式处理数据结构方面性能非常好。

我们来看一个使用GParsPool类的例子。首先,我们要包含必要的import语句。然后,使用main()方法创建一个名为Example2的类。

  1. import groovyx.gpars.GParsPool
  2. import static groovyx.gpars.GParsPool.withPool
  3. class Example2 {
  4. static main(args) {

然后,声明一个从1到1000的数值范围,并使用withPool语句以并行方式处理这些全部数值。我们使用println方法来输出处理该数值的线程名称,以及当前处理的数值。可以使用it变量来访问该数值。

  1. def numbers = 1..1000;
  2. println "Example 2 - Part 1"
  3. withPool {
  4. numbers.eachParallel {
  5. println Thread.currentThread().getName() +": "+ it;
  6. }
  7. }

然后,使用withPool语句,不过现在该语句带有一个表示可使用的最大线程数的参数。

  1. println "Example 2 - Part 2"
  2. withPool(4){
  3. List numberList = numbers.collectParallel { it *it}
  4. List smallNumberList = numberList.findAllParallel{ it < 100 }
  5. smallNumberList.eachParallel {
  6. println Thread.currentThread().getName() +": "+ it;
  7. }
  8. }

我们使用Groovy提供的三种方法并行处理该范围内的数值。可以使用collectParallel()方法来计算每个数值的平方。可以使用findAllParallel()方法来进行数值筛选,只接收那些小于100的数值。最后,可以使用eachParallel()方法来处理结果列表的所有方法。

可以使用其他方法并行处理符合某种数据结构的数据,例如minParallel()maxParallel()countParallel()。通过查看GPars API可以了解这些方法的所有详细信息。

以下屏幕截图显示了该应用程序的执行情况。

13.3 软件事务性内存 - 图1

GParsPool类提供的另一个选项是使用callAsync()executeAsyncAndWait()方法在不同的线程中调用闭包。第一个方法在不同的线程中启动闭包的执行,并且立即返回;而另一个方法则在返回之前等待闭包结束。让我们来看一个使用这些函数的例子。

首先,我们包含import语句,并且使用main()方法创建一个名为Example3的新类。在main()方法中,创建两个名为code1code2的闭包。

  1. import groovyx.gpars.GParsPool
  2. class Example3 {
  3. static main(args) {
  4. Closure code1 = {
  5. println "Closure 1: "+Thread.currentThread().getName()+": Start:"
  6. +new Date();
  7. Thread.currentThread().sleep(1000)
  8. println "Closure 1: "+Thread.currentThread().getName()+": End: "
  9. +new Date();
  10. }
  11. ...
  12. Closure code2 = {
  13. println "Closure 2: "+Thread.currentThread().getName()+": Start:"
  14. +new Date();
  15. Thread.currentThread().sleep(2000)
  16. println "Closure 2: "+Thread.currentThread().getName()+": End: "
  17. +new Date();
  18. }

首先,使用Groovy的普通语法按顺序调用两个闭包。

  1. println "Closure 1 sequential"
  2. code1.call();
  3. println "Closure 2 sequential"
  4. code2.call();

然后,使用GParsPool类的withPool方法,调用callAsync()方法以并发方式执行code1闭包,然后使用GParsPool类的executeAsyncAndWait()方法来执行code1闭包和code2闭包。

  1. GParsPool.withPool {
  2. println "Closure 1 async";
  3. code1.callAsync();
  4. println "Closure 1 and closure 2 async with wait"
  5. GParsPool.executeAsyncAndWait(code1,code2);
  6. println "End"
  7. }
  8. println "Main end"
  9. }
  10. }

下面的屏幕截图显示了执行本例后的输出。

13.3 软件事务性内存 - 图2

可以看到,可以方便地区分闭包的顺序执行和并发执行(通过线程的名称)。

GParsPool类的另一个选项是使用Map/Reduce编程模型来并行处理任何数据结构。当你在Groovy中使用Map/Reduce时,你的数据结构在内部转换为一个并行数组,你使用的所有方法都将作用于该数据结构。它类似于Java编程语言中的流处理。

让我们看一个使用这一功能的例子。首先,引入必要的import语句,并且创建一个名为Example4的新类,其中含有main()方法。在该方法中,声明一个1到10 000之间的范围。

  1. import groovyx.gpars.GparsPool
  2. class Example4 {
  3. static main(args) {
  4. def numbers = 1..10000

然后,使用withPool语句和Fork/Join功能以并行方式处理该范围中的数值。我们使用parallel方法将该数值范围转换成一个并行数据结构,使用map方法将每个元素替换为该元素的平方,使用filter方法只保留那些小于100 000的数值,使用sum方法对列表中的所有元素求和。

  1. GParsPool.withPool {
  2. int result = numbers.parallel.map{it*it}.filter{it < 100000}
  3. .sum();
  4. println result;

然后,我们看看该功能的其他示例。动态创建另一个介于1和1 000 000之间的数值范围,使用parallel方法将该范围转换成一个并行数据结构,使用filter方法只保留偶数,使用map方法将每个数值替换成其平方根,最后使用collection方法将并行数据结构转换成一个列表。

  1. List numberList = (1..1000000).parallel.filter{it % 2 == 0}
  2. .map{Math.sqrt it}.collection
  3. numberList.forEach{
  4. println it;
  5. }
  6. }
  7. }
  8. }

执行该示例时,可以在控制台中看到输出的数值。

最后,关于GParsPool类我们要学习的最后一点是如何使用Promise来获取异步函数的值。让我们看一个使用该功能的例子。首先,创建一个名为Example5的类,其中含有main()方法,以及一个名为code1的闭包。

  1. import static groovyx.gpars.GParsPool.withPool;
  2. class Example5 {
  3. static main(args) {
  4. Closure code1 = {
  5. println "Closure 1: "+Thread.currentThread().getName()+": Start:"
  6. +new Date();
  7. Thread.currentThread().sleep(1000)
  8. println "Closure 1: "+Thread.currentThread().getName()+": End: "
  9. +new Date();
  10. return new Date().toString();
  11. }

然后,使用withPool语句调用asyncFun()方法,以异步方式执行code1闭包,然后生成一个含有该方法结果的Promise。最后,使用该Promise的get()方法来获得code1闭包的结果。请注意,get()方法休眠调用线程,直到闭包完成执行。

  1. withPool {
  2. def aCode1 = code1.asyncFun();
  3. def promise = aCode1();
  4. println "We have call the closure";
  5. println "The result is : "+promise.get();
  6. }
  7. }
  8. }

下面的屏幕截图展示了执行本例后得到的输出结果。

13.3 软件事务性内存 - 图3

13.3.3 Fork/Join处理

GPars提供的Fork/Join实现类似于Java并发API中提供的Fork/Join实现。该功能的主要目的是利用分治技术解决问题。第一次执行该算法时,针对的是一个完整的问题,可以检查该问题的规模。如果其规模小于预先定义的规模,则可以直接解决问题。否则,你可以将问题划分为预定义数目的小问题并且进行异步递归调用,每个子问题进行一次递归调用。每次递归调用的处理过程都是相同的。你可以再次检查问题的规模,如果它小于预定义的规模,就可以直接进行求解;否则,再次分割问题并再次进行递归调用。当所有递归调用都结束后,启动这些调用的方法将再次得到控制权,获取每次调用的结果并将这些结果分组,最后返回结果。最后,通过分组求解许多小问题,我们求解了一个大规模问题。

请注意,并不是所有的算法都可以使用这种技术来求解,但是只要你可以使用这种技术,就可以对资源进行优化使用,得到非常好的性能结果。

GPars库提供了以下方法来使用Fork/Join框架。

  • runForkJoin():该方法创建一个Fork/Join执行过程。你必须指定算法的参数和实现该算法的闭包。递归调用具有相同的参数。
  • forOffChild():创建一个新的子任务来执行子问题。该任务将在未来执行。该方法将待调度任务发送到正在执行全部任务的ForkJoinPool中,并且立即返回。
  • runChildDirectly():在当前线程中运行子任务,并且当其结束执行时返回。
  • getChildrenResults():等待所有子任务最终完成,并且返回一个含有结果的List对象。可以使用该列表来计算将由任务返回的结果。

让我们看一个如何使用GPars库的Fork/Join框架的示例。我们将实现一个函数,它计算目录中以.log为扩展名的文件数目。首先,包含必要的import语句并创建一个名为Example6的类,其中含有main()方法。

  1. import static groovyx.gpars.GParsPool.withPool;
  2. import static groovyx.gpars.GParsPool.runForkJoin;
  3. class Example6 {
  4. static main(args) {

然后,在withPool命令中调用runForkJoin()方法,将File对象作为参数传递。该File对象含有我们要开始寻找扩展名为.log的文件的路径。我们必须指定算法的代码。对于作为参数接收的目录,我们处理其中包含的所有文件和目录。如果是文件,检查其扩展名是否为log。如果扩展名是log,就增加计数器的值。如果是目录,那么使用forkOffChild()方法进行异步递归调用。

当处理完所有的项目后,就得到了所有子任务的结果,并将这些任务的结果和计数器相加。最后的值就是返回的结果。

  1. withPool() {
  2. def count = runForkJoin(new File("c:\\windows")) {file ->
  3. long count = 0
  4. file.eachFile {
  5. if (it.isDirectory()) {
  6. println "Forking a child task for $it"
  7. forkOffChild(it)
  8. } else {
  9. if (it.getName().endsWith("log")) {
  10. count++;
  11. println it.getName();
  12. }
  13. }
  14. }
  15. return count + (childrenResults.sum(0))
  16. }

要注意,子任务也可以有子任务,以此类推。最后,当初始调用结束时,输出最终结果。

  1. println "Total: "+ count;
  2. }
  3. }
  4. }

执行本例时,你可以看到文件的总数。

13.3.4 Actor

Actor实现了消息传递并发模型。每个Actor都是一个独立的对象,它向其他Actor发送消息并且接收来自其他Actor的消息。Actor和线程之间并没有关联。线程可以执行不同的Actor,而一个Actor也可以由不同的线程执行。Actor没有共享状态,GPars保证了Actor的代码可被执行,这样就不会丢失消息。每当线程被分配给一个Actor时,内存也会随之同步,因此不需要显式同步。Actor有两种类型。

  • 无状态Actor:基于DynamicDispatchActor类或者ReactiveActor类。它们无法追踪此前曾有哪些消息到达。
  • 有状态Actor:基于DefaultActor类。该Actor可以管理内部状态,每个消息都可以改变该状态以及处理该消息的方式。

Actor最大的好处之一在于你可以在系统中获得吞吐量。只有在需要处理消息时,才会执行Actor,因此你可以拥有大量需要少量线程运行的Actor。

当使用Actor时,你会使用下述方法来做最常见的操作。

  • send():该方法向Actor异步发送消息。该方法将立即返回,并不会等待响应。
  • sendAndWait():该方法向Actor发送消息并且等待响应。
  • sendAndContinue():该方法向Actor发送消息并且立即返回。它接收一个闭包作为参数,并在该消息的响应到达时执行该闭包。
  • sendAndPromise():该方法向Actor发送消息并且返回一个Promise,可以通过该Promise来获得该消息的响应。
  • react():该方法将被调用来处理下一条消息。通常,该方法会包含在一个循环语句中,以处理Actor接收到的所有消息。
  • reply():该方法向消息的发送者发送应答。
  • forward():该方法允许我们将接收到的消息发送给另一个Actor。
  • join():该方法等待Actor结束。

还有其他不同的方法可以创建Actor。

你可以使用Actors类的actor()方法。在这种情况下,你可以使用闭包来指定Actor的代码。Actor将立即开始执行。

你可以扩展DefaultActor类并且实现act()方法。在这种情况下,我们必须调用Actor的start()方法来开始其执行过程。

你可以扩展DynamicDispatchACtor类,并且实现onMessage()方法的一个或多个版本(Actor可接收到的每种消息各一个版本)。

最后,Actor有生命周期以及一些相关的方法,你可以实现这些方法来在该生命周期的确定状态下执行操作。这些方法包括:

  • afterStart()
  • afterStop()
  • onTimeOut()
  • onInterrupt()
  • onException()

这些方法的功能恰如其名,因此无须额外描述。

下面用三个例子来说明如何使用Actor。在第一个例子中,仅创建两个基本的Actor对象,并且将在它们之间发送一条消息。

创建一个名为Example7的类,其中含有main()方法。

  1. import groovyx.gpars.actor.Actor
  2. import groovyx.gpars.actor.Actors
  3. class Example7 {
  4. static main(args) {

然后,使用Actor类的actor()方法创建一个Actor。在该Actor的代码中,我们包含了react()方法的代码。在我们的例子中,当消息到达时,在控制台输出它,然后发送对该消息的响应,其中包括当前线程的名称和文本“Ok”。

  1. def receiver = Actors.actor {
  2. println Thread.currentThread().getName()+": Receiver is running"
  3. react { msg ->
  4. println Thread.currentThread().getName()+": Recevier: I've
  5. received a message: "+msg
  6. reply Thread.currentThread().getName()+": Ok"
  7. }
  8. println Thread.currentThread().getName()+": Receiver has finished"
  9. }

然后,我们再次使用actors()方法创建另一个Actor。在这种情况下,我们使用send()方法向另一个Actor发送一条消息,其中还包含了在Actor收到消息时react()方法要执行的代码。它将在控制台中输出消息。

  1. def sender = Actors.actor {
  2. println Thread.currentThread().getName()+": Sender is running"
  3. receiver.send Thread.currentThread().getName()+": From sender to
  4. receiver"
  5. react { msg ->
  6. println Thread.currentThread().getName()+": Sender: The response
  7. has arrived: "+msg
  8. }
  9. println Thread.currentThread().getName()+": Sender has finished"
  10. }

正如之前解释的那样,两个Actor都将立即开始执行。最后,在main()方法中,我们使用join()方法等待两个线程结束。

  1. sender.join();
  2. receiver.join();
  3. }
  4. }

下面的屏幕截图显示了执行本例后的输出结果。

13.3 软件事务性内存 - 图4

你可以看到发送方发送的消息如何到达接收方,而接收方如何将应答发送到发送方。

第二个示例是生产者/消费者问题的实现。首先,我们将实现消费者类。创建一个名为Consumer的类,并且指定它实现DefaultActor类。

  1. import groovyx.gpars.actor.Actor
  2. import groovyx.gpars.actor.DefaultActor
  3. class Consumer extends DefaultActor {

然后,实现包含Actor主代码的act()方法。我们使用循环语句处理所有消息和react()方法,该方法将在Actor接收到的每条消息上调用。我们将参数5000传递给react()方法。如果Actor等待了5秒钟却没有收到消息,那么它将抛出一个超时错误并结束其执行。对于每条消息,我们只在控制台上输出关于该消息和发送方的信息。

  1. void act() {
  2. loop {
  3. react(5000) { msg ->
  4. println "*****************************";
  5. println "Thread Name: "+Thread.currentThread().getName();
  6. println "Sender: "+sender.remoteClass;
  7. println "Message: "+msg;
  8. println "*****************************";
  9. }
  10. }
  11. }

然后,我们执行Actor的一些生命周期方法,以便在控制台中输出有关这些事件的信息。

  1. void afterStart() {
  2. println "Consumer: After Start";
  3. }
  4. void afterStop(List undeliveredMessages) {
  5. println "Consumer: After Stop";
  6. println "Undelivered Messages: "+undeliveredMessages.size()
  7. }
  8. void onInterrupt(InterruptedException e) {
  9. println "Consumer: Interrupted"
  10. e.printStackTrace()
  11. terminate()
  12. }
  13. void onTimeout() {
  14. println "Consumer: Timeout"
  15. terminate()
  16. }
  17. void onException(Throwable e) {
  18. println "Consumer: An exception has ocurred"
  19. e.printStackTrace()
  20. }
  21. }

现在该实现生产者类了。创建一个名为Producer的类,并且指定它实现DefaultActor类。该类有两个属性,名称分别是发送消息的ProducerConsumer,并且使用构造函数来初始化它们。

  1. import java.lang.invoke.AbstractValidatingLambdaMetafactory
  2. import java.util.List
  3. import groovyx.gpars.actor.DefaultActor
  4. import groovyx.gpars.actor.Actor
  5. class Producer extends DefaultActor {
  6. private Actor consumer;
  7. private String name;
  8. def Producer (Actor consumer, String name) {
  9. this.consumer = consumer
  10. this.name = name
  11. }

现在,使用Actor的主代码实现act()方法。它将向消费者发送100条消息并结束执行。

  1. void act() {
  2. def i;
  3. for (i = 0; i<100; i++) {
  4. def msg = Thread.currentThread().getName()
  5. msg+= ": "+name
  6. msg+= ": Message "+i;
  7. consumer.send msg;
  8. Thread.currentThread().sleep(500);
  9. }
  10. }

最后,我们编写afterStop()方法的代码,该方法在控制台输出一条消息。

  1. void afterStop(List undeliveredMessages) {
  2. println name+": After Stop";
  3. }
  4. }

现在,创建一个名为Example8的类,其中含有main()方法。

  1. import groovyx.gpars.actor.Actor
  2. import groovyx.gpars.actor.DefaultActor
  3. class Example8 {
  4. static main(args) {
  5. Consumer consumer = new Consumer();
  6. consumer.start();
  7. Producer producer1 = new Producer(consumer,"Producer 1");
  8. Producer producer2 = new Producer(consumer, "Producer 2");
  9. producer1.start();
  10. Thread.currentThread().sleep(300);
  11. producer2.start();
  12. consumer.join();
  13. println "Main end"
  14. }
  15. }

main()方法中,我们创建一个消费者和两个生产者,并且使用start()方法启动三个Actor。我们使用join()方法等待消费者Actor结束。在生产者发送TimeOut异常5秒钟后,该Actor将会结束。

下面的屏幕截图显示了执行该例时的输出结果。

13.3 软件事务性内存 - 图5

你可以看到生产者如何结束其执行并且输出afterStop()方法的消息。然后,消费者出现一个TimeOut异常,并且执行onTimeOut()afterStop()方法。然后,主程序结束其执行。

最后一个关于Actor的示例将向你展示如何使用无状态Actor。首先,创建一个名为Event的类,该类有两个属性:一个名为msgString属性和一个名为dateDate属性。

  1. class Event {
  2. String msg;
  3. Date date;
  4. @Override
  5. public String toString() {
  6. return "Event: "+msg+": on "+date;
  7. }
  8. }

现在,创建一个名为Logger的类,并且指定它扩展DynamicDispatchActor类。我们实现onMessage()方法的三个版本,它们分别接收Event对象、String对象和Exception对象作为参数。我们在控制台中仅输出有关它收到的消息的信息。

  1. class Logger extends DynamicDispatchActor {
  2. def onMessage (Event event) {
  3. println "Logger: "+Thread.currentThread().getName()+": "+event;
  4. replyIfExists "Logger: Event received";
  5. }
  6. def onMessage(String msg) {
  7. println "Logger: "+Thread.currentThread().getName()+
  8. ": Direct mgs: "+msg;
  9. replyIfExists "Logger: Direct msg received";
  10. }
  11. def onMessage(Exception e) {
  12. println "Logger: "+Thread.currentThread().getName()+": Error:
  13. "+e.getLocalizedMessage();
  14. replyIfExists "Logger: Error received"
  15. }
  16. }

最后,创建一个名为Example9的类以及main()方法。首先,创建一个Logger类的Actor,并使用start()方法启动其执行。

  1. import groovyx.gpars.actor.Actor
  2. import groovyx.gpars.actor.Actors
  3. class Example9 {
  4. static main(args) {
  5. Logger logger = new Logger();
  6. logger.start();

然后,使用Actors类的actor()方法创建另一个Actor。在代码中,我们向Logger类发送三个消息(每种类型一个),并且包含用于处理三个响应的代码。

  1. def tester = Actors.actor {
  2. println "Tester: "+Thread.currentThread().getName()+
  3. ": is running"
  4. loop(3) {
  5. react(1000) { msg ->
  6. println "Tester: "+Thread.currentThread().getName()+
  7. ": I've received a message: "+msg
  8. }
  9. }
  10. Event event = new Event()
  11. event.msg = "I'm an event"
  12. event.date = new Date()
  13. logger.send event
  14. logger.send "I'm a message"
  15. Exception e = new Exception("I'm an exception")
  16. logger.send e;
  17. println "Tester: "+Thread.currentThread().getName()+
  18. ": Tester has finished"
  19. }

最后,使用join()方法等待tester Actor结束,使用stop()方法停止logger Actor,并且使用join()方法等待其最终结束。

  1. tester.join();
  2. logger.stop();
  3. logger.join();
  4. println "Main End"
  5. }
  6. }

下面的屏幕截图展示了执行本例后的输出结果。

13.3 软件事务性内存 - 图6

13.3.5 Agent

Agent保护可变数据对象,使之可以在线程之间安全地共享。Agent接受消息并以异步方式处理它们。消息是可以在Agent内部执行的函数或Groovy闭包。函数的返回值或闭包将成为Agent的新值/状态。函数或闭包将Agent的当前值/状态作为参数。

我们发送给Agent的命令是按顺序存储的,而且是一个接一个地处理,因此不会出现任何竞争条件。

要创建Agent,需要创建Agent类的一个新对象,用Agent中存储的取值类型对Agent类进行参数化。

当你使用Agent时,通常使用以下方法。

  • send():该方法向Agent发送一个命令。
  • addListener():该方法添加一个监听器,每当Agent的值发生变化时就会得到通知。
  • addValidator():该方法添加一个类似于监听器的验证器,但是可以拒绝对抛出异常的Agent的值做出更改。

让我们实现一个示例,看看如何使用Agent。首先,创建一个名为Account的类,它含有一个名为value的内部整型属性,一个名为increment()的方法(用于递增value属性的值),一个名为decrement()的方法(用于递减value属性的值),以及返回value属性值的方法。

  1. class Account {
  2. private int value = 0;
  3. def void increment (int amount) {
  4. println "Account.increment: "+Thread.currentThread().getName()+": "
  5. +amount;
  6. value+=amount
  7. }
  8. def void decrement (int amount) {
  9. println "Account.decrement: "+Thread.currentThread().getName()+": "
  10. +amount;
  11. value-=amount
  12. }
  13. def int getValue() {
  14. return value;
  15. }
  16. }

然后,创建一个名为Example10的类以及main()方法。创建一个存储Account对象的新Agent。

  1. class Example10 {
  2. static main(args) {
  3. Agent agent = new Agent<Account>(new Account())

然后,创建一个Actor,对该Agent的Account对象调用100次increment()方法。你可以使用it变量来访问该Agent的当前值。

  1. def incrementer = Actors.actor {
  2. for (def i=0; i<100; i++) {
  3. agent.send {it.increment(1000)}
  4. }
  5. }

现在,创建另一个Actor。该Actor将对存储在该Agent中的Account对象调用99次decrement()方法。

  1. def decrementer = Actors.actor {
  2. for (def i=0; i<99; i++) {
  3. agent.send {it.decrement(1000)}
  4. }
  5. }

最后,等待两个Actor执行结束并且输出该Agent的最终值。

  1. incrementer.join()
  2. decrementer.join()
  3. println "Final value: "+agent.val.getValue()
  4. }
  5. }

如果执行该示例,你将会看到结果为1000(100次递增操作和99次递减操作)。

13.3.6 Dataflow

Dataflow为生产者和消费者之间共享数据提供了安全通道。Dataflow最基本的元素是Dataflow变量。你只需创建一个Dataflows类的对象,然后我们可以在其之上定义变量。这些变量有两个重要特征。

  • 只能设置一次值。
  • 当一个任务试图使用Dataflow变量的值时,它的执行线程将被阻塞,直到该变量有值为止。

使用Dataflow变量可以获得以下好处。

  • 没有竞争条件。
  • 不需要显式地使用锁或其他同步机制。
  • 如果存在由Dataflow变量引发的死锁,可以确定其原因。

我们来看一个使用Dataflow变量的例子。首先,创建一个名为Example1的类,其中含有main()方法。

  1. import static groovyx.gpars.dataflow.Dataflow.task;
  2. import java.util.concurrent.TimeUnit
  3. import groovyx.gpars.dataflow.Dataflows;
  4. class Example11 {
  5. static main(args) {

现在,创建一个Dataflows对象和一个Date对象(其中含有该方法启动执行的日期)。

  1. def store = new Dataflows()
  2. def mainStart = new Date();
  3. println "Main: Start "+mainStart

现在,启动一个逻辑任务,它将由另一个使用task函数的线程执行。将其执行线程休眠1秒钟,然后在Dataflows对象中创建一个变量,并且为其赋值为3。

  1. task {
  2. TimeUnit.SECONDS.sleep(1)
  3. store.x = 3
  4. }

现在,创建另一个与前述任务类似的任务。我们将执行线程休眠2秒钟,然后将一个名为y的变量指派给它,其值为4。

  1. task {
  2. TimeUnit.SECONDS.sleep(2)
  3. store.y = 4
  4. }

然后,创建第三个任务,该任务将计算变量xy之间的和,并且将该值存储在另一个名为z的Dataflow变量中。

  1. task {
  2. def start = new Date()
  3. println "Calculus Task: "+start
  4. store.z = store.x + store.y
  5. def end = new Date()
  6. println "Calculus Task: "+end
  7. }

最后,在main()方法中输出变量z的值。

  1. println "Main: The final result is: "+store.z
  2. println "Main: End"
  3. }
  4. }

下面的屏幕截图显示了执行这个例子后的输出结果。

13.3 软件事务性内存 - 图7

我们还可以创建一个DataflowVariable类的对象,并且使用<<运算符给它赋值。例如,创建一个名为Example13的类以及main()方法,并且创建一个名为dataDataflowVariable类的对象。

  1. import static groovyx.gpars.dataflow.Dataflow.task;
  2. import java.util.concurrent.TimeUnit
  3. import groovyx.gpars.dataflow.DataflowVariable;
  4. class Example13 {
  5. static main(args) {
  6. def data = new DataflowVariable()

现在,创建一个任务将其执行线程休眠2秒钟,并且使用<<运算符将值2赋给该变量。

  1. task {
  2. println Thread.currentThread().getName()+": Wait two seconds to
  3. set the value"
  4. TimeUnit.SECONDS.sleep(2);
  5. data << 2;
  6. }

最后,在main()方法中包含一个输出data变量取值的语句。

  1. println Thread.currentThread().getName()+" : Bind handler : "
  2. +data.val;
  3. }
  4. }

当你执行该示例时,将看到任务所输出的消息,以及两秒之后由main()方法所输出的消息,其中含有DataflowVariable对象的值。

Dataflow提供的另一个元素是Dataflow广播。它允许我们在生产者和消费者之间发送数据,就像在它们之间存在一个队列一样。它提供了一种发布—订阅机制,以便支持多个生产者与一个或多个消费者交互的情况。

下面来看看这种机制是如何运作的。首先,创建一个名为Producer的类。它有两个私有属性:一个名为broadcastDataflowBroadcast对象和一个名为nameString对象。使用该类的构造函数来初始化这两个属性。

  1. import java.util.concurrent.TimeUnit
  2. import groovyx.gpars.dataflow.DataflowBroadcast;
  3. class Producer {
  4. private DataflowBroadcast broadcast
  5. private String name
  6. public Producer (DataflowBroadcast broadcast, String name) {
  7. this.broadcast = broadcast
  8. this.name = name
  9. }

现在,实现一个名为execute()的方法。在该方法中,使用<<运算符将100个String对象写入broadcast对象。在每条消息之间,将执行线程休眠500毫秒。

  1. public void execute() {
  2. for (int i=0; i<100; i++) {
  3. def msg = name + " MSG "+i+" : "+new Date();
  4. broadcast << msg
  5. TimeUnit.MILLISECONDS.sleep(500);
  6. }
  7. }
  8. }

现在,创建一个名为Consumer的类。该类将和Producer类具有相同的属性。使用该类的构造函数来初始化这两个属性。

  1. import groovyx.gpars.dataflow.DataflowBroadcast
  2. import groovyx.gpars.dataflow.DataflowReadChannel
  3. class Consumer {
  4. private DataflowBroadcast broadcast
  5. private String name
  6. public Consumer (DataflowBroadcast broadcast, String name) {
  7. this.broadcast = broadcast
  8. this.name = name
  9. }

现在,实现execute()方法。首先,创建一个DataflowReadChannel类的对象,读取来自DataflowBroadcast的值。然后,使用val函数编写200条消息。该函数将会休眠当前线程,直到DataflowBroadcast中的新数据可用。

  1. public void execute() {
  2. DataflowReadChannel stream = broadcast.createReadChannel()
  3. for (int i=0; i<200; i++) {
  4. println "Consumer "+name+": "+stream.val
  5. }
  6. }
  7. }

我们有了生产者和消费者。现在该让它们工作了。创建一个名为Example12的类,其中含有main()方法。创建一个DataflowBroadcast对象、两个生产者和三个消费者。创建一个线程来执行每个生产者和每个消费者。然后,使用join()方法等待它们结束。

  1. import groovyx.gpars.dataflow.DataflowBroadcast
  2. import static groovyx.gpars.dataflow.Dataflow.task
  3. class Example12 {
  4. static main(args) {
  5. DataflowBroadcast dataflow = new DataflowBroadcast()
  6. def producer1, producer2, consumer1, consumer2, consumer3
  7. Thread thread1 = Thread.start {
  8. producer1 = new Producer(dataflow, "Producer 1")
  9. producer1.execute()
  10. }
  11. Thread thread2 = Thread.start {
  12. producer2 = new Producer(dataflow, "Producer 2")
  13. producer2.execute()
  14. }
  15. Thread thread3 = Thread.start{
  16. consumer1 = new Consumer(dataflow, "Consumer 1")
  17. consumer1.execute()
  18. }
  19. Thread thread4 = Thread.start {
  20. consumer2 = new Consumer(dataflow, "Consumer 2")
  21. consumer2.execute()
  22. }
  23. Thread thread5 = Thread.start {
  24. consumer3 = new Consumer(dataflow, "Consumer 3")
  25. consumer3.execute()
  26. }
  27. thread1.join()
  28. thread2.join()
  29. thread3.join()
  30. thread4.join()
  31. thread5.join()
  32. println "Main: end"
  33. }
  34. }

下面的屏幕截图显示了执行本例后的输出结果。

13.3 软件事务性内存 - 图8

可以看到由生产者生成的每条消息如何到达三个消费者。

Dataflow提供的另一个功能是使用select()函数从多个通道中选择一个值。该函数接收一个通道列表作为参数,它将从全部含有可读值的通道中选择一个。该函数返回一个SelectResult对象,其中含有返回的值和它所选通道的信息。这种机制也是可配置的,例如,对某些渠道进行优先级排序。

我们来看看这种机制是如何运作的。首先,创建一个名为Example14的类,其中含有main()方法。创建名为source1source2source3的三个DataflowVariable对象。

  1. import static groovyx.gpars.dataflow.Dataflow.task
  2. import static groovyx.gpars.dataflow.Dataflow.select
  3. import java.util.concurrent.TimeUnit
  4. import groovyx.gpars.dataflow.DataflowVariable
  5. class Example14 {
  6. static main(args) {
  7. def source1 = new DataflowVariable()
  8. def source2 = new DataflowVariable()
  9. def source3 = new DataflowVariable()

现在,创建三个任务来为每个数据源提供一个值。每个任务在为其DataflowVariable对象赋值之前,会将其执行线程休眠不同的时间。

  1. task {
  2. TimeUnit.SECONDS.sleep(3);
  3. source1 << "source1"
  4. }
  5. task {
  6. TimeUnit.SECONDS.sleep(5);
  7. source2 << "source2"
  8. }
  9. task {
  10. TimeUnit.SECONDS.sleep(1);
  11. source3 << "source3"
  12. }

现在,使用select函数从这些数据源获取值,并且将其输出到控制台。

  1. def result = select([source1, source2, source3])
  2. println "Main: "+result.select()
  3. }
  4. }

下面的屏幕截图显示了执行该例后的输出结果。

13.3 软件事务性内存 - 图9

在本例中,source3对象首先得到值,因此select函数将在一秒钟之后返回它。

我们要分析的最后一种Dataflow机制是运算符。运算符从输入通道接收值,并且生成新值写入输出通道。所有这些通道都是Dataflow的变量。运算符将等待所有输入通道,直到它开始执行为止。

我们来看看这种机制是如何运行的。创建一个名为Example15的类,其中含有main()方法。创建名为abcd的四个DataflowVariable对象。

  1. import groovyx.gpars.dataflow.DataflowVariable;
  2. import static groovyx.gpars.dataflow.Dataflow.operator;
  3. import java.util.concurrent.TimeUnit
  4. class Example15 {
  5. static main(args) {
  6. def a = new DataflowVariable();
  7. def b = new DataflowVariable();
  8. def c = new DataflowVariable();
  9. def d = new DataflowVariable();

现在使用operator命令创建一个名为op的新运算符。它接收三个输入,即Dataflow变量abc,并且返回Dataflow变量d的值。我们使用bindOutput函数来确定输出的值。

  1. def op = operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
  2. println "Operator"
  3. bindOutput 0, x + y + z
  4. }

最后,为变量abc赋值,并且使用变量dval属性将DataflowVariable的值输出到控制台。

  1. a << 3;
  2. b << 5;
  3. c << 7;
  4. println "Main: "+d.val
  5. }
  6. }

当我们将值赋给三个DataflowVariable对象时,运算符执行其代码。当其完成之后,DataflowVariable d就有了值,并且在main()方法的最后一条语句中输出。

下面的屏幕截图显示了执行本例后的输出结果。

13.3 软件事务性内存 - 图10