13.3 软件事务性内存
软件事务性内存是一种机制,它为程序员在内存中访问数据提供了事务性语义。本节,你将学习如何在Groovy中应用这些元素。尽管我们并没有介绍Groovy编程语言,但是你可以通过互联网查找到很多关于Groovy编程语言的教程。在GPars的主页可以下载该库并查找有关如何使用它们的文档。正如前面提到的,你还可以在Java编程语言中使用该库。
13.3.1 使用Java元素
Groovy是一种针对JVM生成字节码的编程语言。你可以在Groovy程序中使用Java编程语言的所有元素,包括与并发处理相关的所有元素。
例如,在下面的例子中,你将创建一个线程。首先,使用main()方法声明一个名为Example1的Groovy类。
class Example1 {static main(args) {
然后,使用Thread类的start()方法创建并执行一个线程。你可以指定要由该线程执行的代码。在本例中,我们将显示当前日期,休眠当前线程1秒钟,然后再次写入当前日期。
def task = Thread.start {println Thread.currentThread().getName()+": Starting the thread:"+new Date();Thread.currentThread().sleep(1000);println Thread.currentThread().getName()+": Ending the thread:"+new Date();}
可以使用join()方法来等待该线程结束。
task.join();println Thread.currentThread().getName()+": Main has ended: "+new Date();}}
当执行该应用程序时,你将看到该线程显示了第一个消息,并且在一秒钟后显示第二个消息。然后,当它完成执行时,main()方法显示了它的消息。
13.3.2 数据并行处理
在本节中,我们将采用Groovy编程语言提供的所有元素以并发方式处理数据结构。我们要考虑的第一个元素是GParsPool类。这个类是基于Fork/Join框架的JSR-166y的实现,它在以并发方式处理数据结构方面性能非常好。
我们来看一个使用GParsPool类的例子。首先,我们要包含必要的import语句。然后,使用main()方法创建一个名为Example2的类。
import groovyx.gpars.GParsPoolimport static groovyx.gpars.GParsPool.withPoolclass Example2 {static main(args) {
然后,声明一个从1到1000的数值范围,并使用withPool语句以并行方式处理这些全部数值。我们使用println方法来输出处理该数值的线程名称,以及当前处理的数值。可以使用it变量来访问该数值。
def numbers = 1..1000;println "Example 2 - Part 1"withPool {numbers.eachParallel {println Thread.currentThread().getName() +": "+ it;}}
然后,使用withPool语句,不过现在该语句带有一个表示可使用的最大线程数的参数。
println "Example 2 - Part 2"withPool(4){List numberList = numbers.collectParallel { it *it}List smallNumberList = numberList.findAllParallel{ it < 100 }smallNumberList.eachParallel {println Thread.currentThread().getName() +": "+ it;}}
我们使用Groovy提供的三种方法并行处理该范围内的数值。可以使用collectParallel()方法来计算每个数值的平方。可以使用findAllParallel()方法来进行数值筛选,只接收那些小于100的数值。最后,可以使用eachParallel()方法来处理结果列表的所有方法。
可以使用其他方法并行处理符合某种数据结构的数据,例如minParallel()、maxParallel()或countParallel()。通过查看GPars API可以了解这些方法的所有详细信息。
以下屏幕截图显示了该应用程序的执行情况。

GParsPool类提供的另一个选项是使用callAsync()或executeAsyncAndWait()方法在不同的线程中调用闭包。第一个方法在不同的线程中启动闭包的执行,并且立即返回;而另一个方法则在返回之前等待闭包结束。让我们来看一个使用这些函数的例子。
首先,我们包含import语句,并且使用main()方法创建一个名为Example3的新类。在main()方法中,创建两个名为code1和code2的闭包。
import groovyx.gpars.GParsPoolclass Example3 {static main(args) {Closure code1 = {println "Closure 1: "+Thread.currentThread().getName()+": Start:"+new Date();Thread.currentThread().sleep(1000)println "Closure 1: "+Thread.currentThread().getName()+": End: "+new Date();}...Closure code2 = {println "Closure 2: "+Thread.currentThread().getName()+": Start:"+new Date();Thread.currentThread().sleep(2000)println "Closure 2: "+Thread.currentThread().getName()+": End: "+new Date();}
首先,使用Groovy的普通语法按顺序调用两个闭包。
println "Closure 1 sequential"code1.call();println "Closure 2 sequential"code2.call();
然后,使用GParsPool类的withPool方法,调用callAsync()方法以并发方式执行code1闭包,然后使用GParsPool类的executeAsyncAndWait()方法来执行code1闭包和code2闭包。
GParsPool.withPool {println "Closure 1 async";code1.callAsync();println "Closure 1 and closure 2 async with wait"GParsPool.executeAsyncAndWait(code1,code2);println "End"}println "Main end"}}
下面的屏幕截图显示了执行本例后的输出。

可以看到,可以方便地区分闭包的顺序执行和并发执行(通过线程的名称)。
GParsPool类的另一个选项是使用Map/Reduce编程模型来并行处理任何数据结构。当你在Groovy中使用Map/Reduce时,你的数据结构在内部转换为一个并行数组,你使用的所有方法都将作用于该数据结构。它类似于Java编程语言中的流处理。
让我们看一个使用这一功能的例子。首先,引入必要的import语句,并且创建一个名为Example4的新类,其中含有main()方法。在该方法中,声明一个1到10 000之间的范围。
import groovyx.gpars.GparsPoolclass Example4 {static main(args) {def numbers = 1..10000
然后,使用withPool语句和Fork/Join功能以并行方式处理该范围中的数值。我们使用parallel方法将该数值范围转换成一个并行数据结构,使用map方法将每个元素替换为该元素的平方,使用filter方法只保留那些小于100 000的数值,使用sum方法对列表中的所有元素求和。
GParsPool.withPool {int result = numbers.parallel.map{it*it}.filter{it < 100000}.sum();println result;
然后,我们看看该功能的其他示例。动态创建另一个介于1和1 000 000之间的数值范围,使用parallel方法将该范围转换成一个并行数据结构,使用filter方法只保留偶数,使用map方法将每个数值替换成其平方根,最后使用collection方法将并行数据结构转换成一个列表。
List numberList = (1..1000000).parallel.filter{it % 2 == 0}.map{Math.sqrt it}.collectionnumberList.forEach{println it;}}}}
执行该示例时,可以在控制台中看到输出的数值。
最后,关于GParsPool类我们要学习的最后一点是如何使用Promise来获取异步函数的值。让我们看一个使用该功能的例子。首先,创建一个名为Example5的类,其中含有main()方法,以及一个名为code1的闭包。
import static groovyx.gpars.GParsPool.withPool;class Example5 {static main(args) {Closure code1 = {println "Closure 1: "+Thread.currentThread().getName()+": Start:"+new Date();Thread.currentThread().sleep(1000)println "Closure 1: "+Thread.currentThread().getName()+": End: "+new Date();return new Date().toString();}
然后,使用withPool语句调用asyncFun()方法,以异步方式执行code1闭包,然后生成一个含有该方法结果的Promise。最后,使用该Promise的get()方法来获得code1闭包的结果。请注意,get()方法休眠调用线程,直到闭包完成执行。
withPool {def aCode1 = code1.asyncFun();def promise = aCode1();println "We have call the closure";println "The result is : "+promise.get();}}}
下面的屏幕截图展示了执行本例后得到的输出结果。

13.3.3 Fork/Join处理
GPars提供的Fork/Join实现类似于Java并发API中提供的Fork/Join实现。该功能的主要目的是利用分治技术解决问题。第一次执行该算法时,针对的是一个完整的问题,可以检查该问题的规模。如果其规模小于预先定义的规模,则可以直接解决问题。否则,你可以将问题划分为预定义数目的小问题并且进行异步递归调用,每个子问题进行一次递归调用。每次递归调用的处理过程都是相同的。你可以再次检查问题的规模,如果它小于预定义的规模,就可以直接进行求解;否则,再次分割问题并再次进行递归调用。当所有递归调用都结束后,启动这些调用的方法将再次得到控制权,获取每次调用的结果并将这些结果分组,最后返回结果。最后,通过分组求解许多小问题,我们求解了一个大规模问题。
请注意,并不是所有的算法都可以使用这种技术来求解,但是只要你可以使用这种技术,就可以对资源进行优化使用,得到非常好的性能结果。
GPars库提供了以下方法来使用Fork/Join框架。
runForkJoin():该方法创建一个Fork/Join执行过程。你必须指定算法的参数和实现该算法的闭包。递归调用具有相同的参数。forOffChild():创建一个新的子任务来执行子问题。该任务将在未来执行。该方法将待调度任务发送到正在执行全部任务的ForkJoinPool中,并且立即返回。runChildDirectly():在当前线程中运行子任务,并且当其结束执行时返回。getChildrenResults():等待所有子任务最终完成,并且返回一个含有结果的List对象。可以使用该列表来计算将由任务返回的结果。
让我们看一个如何使用GPars库的Fork/Join框架的示例。我们将实现一个函数,它计算目录中以.log为扩展名的文件数目。首先,包含必要的import语句并创建一个名为Example6的类,其中含有main()方法。
import static groovyx.gpars.GParsPool.withPool;import static groovyx.gpars.GParsPool.runForkJoin;class Example6 {static main(args) {
然后,在withPool命令中调用runForkJoin()方法,将File对象作为参数传递。该File对象含有我们要开始寻找扩展名为.log的文件的路径。我们必须指定算法的代码。对于作为参数接收的目录,我们处理其中包含的所有文件和目录。如果是文件,检查其扩展名是否为log。如果扩展名是log,就增加计数器的值。如果是目录,那么使用forkOffChild()方法进行异步递归调用。
当处理完所有的项目后,就得到了所有子任务的结果,并将这些任务的结果和计数器相加。最后的值就是返回的结果。
withPool() {def count = runForkJoin(new File("c:\\windows")) {file ->long count = 0file.eachFile {if (it.isDirectory()) {println "Forking a child task for $it"forkOffChild(it)} else {if (it.getName().endsWith("log")) {count++;println it.getName();}}}return count + (childrenResults.sum(0))}
要注意,子任务也可以有子任务,以此类推。最后,当初始调用结束时,输出最终结果。
println "Total: "+ count;}}}
执行本例时,你可以看到文件的总数。
13.3.4 Actor
Actor实现了消息传递并发模型。每个Actor都是一个独立的对象,它向其他Actor发送消息并且接收来自其他Actor的消息。Actor和线程之间并没有关联。线程可以执行不同的Actor,而一个Actor也可以由不同的线程执行。Actor没有共享状态,GPars保证了Actor的代码可被执行,这样就不会丢失消息。每当线程被分配给一个Actor时,内存也会随之同步,因此不需要显式同步。Actor有两种类型。
- 无状态Actor:基于
DynamicDispatchActor类或者ReactiveActor类。它们无法追踪此前曾有哪些消息到达。 - 有状态Actor:基于
DefaultActor类。该Actor可以管理内部状态,每个消息都可以改变该状态以及处理该消息的方式。
Actor最大的好处之一在于你可以在系统中获得吞吐量。只有在需要处理消息时,才会执行Actor,因此你可以拥有大量需要少量线程运行的Actor。
当使用Actor时,你会使用下述方法来做最常见的操作。
send():该方法向Actor异步发送消息。该方法将立即返回,并不会等待响应。sendAndWait():该方法向Actor发送消息并且等待响应。sendAndContinue():该方法向Actor发送消息并且立即返回。它接收一个闭包作为参数,并在该消息的响应到达时执行该闭包。sendAndPromise():该方法向Actor发送消息并且返回一个Promise,可以通过该Promise来获得该消息的响应。react():该方法将被调用来处理下一条消息。通常,该方法会包含在一个循环语句中,以处理Actor接收到的所有消息。reply():该方法向消息的发送者发送应答。forward():该方法允许我们将接收到的消息发送给另一个Actor。join():该方法等待Actor结束。
还有其他不同的方法可以创建Actor。
你可以使用Actors类的actor()方法。在这种情况下,你可以使用闭包来指定Actor的代码。Actor将立即开始执行。
你可以扩展DefaultActor类并且实现act()方法。在这种情况下,我们必须调用Actor的start()方法来开始其执行过程。
你可以扩展DynamicDispatchACtor类,并且实现onMessage()方法的一个或多个版本(Actor可接收到的每种消息各一个版本)。
最后,Actor有生命周期以及一些相关的方法,你可以实现这些方法来在该生命周期的确定状态下执行操作。这些方法包括:
afterStart()afterStop()onTimeOut()onInterrupt()onException()
这些方法的功能恰如其名,因此无须额外描述。
下面用三个例子来说明如何使用Actor。在第一个例子中,仅创建两个基本的Actor对象,并且将在它们之间发送一条消息。
创建一个名为Example7的类,其中含有main()方法。
import groovyx.gpars.actor.Actorimport groovyx.gpars.actor.Actorsclass Example7 {static main(args) {
然后,使用Actor类的actor()方法创建一个Actor。在该Actor的代码中,我们包含了react()方法的代码。在我们的例子中,当消息到达时,在控制台输出它,然后发送对该消息的响应,其中包括当前线程的名称和文本“Ok”。
def receiver = Actors.actor {println Thread.currentThread().getName()+": Receiver is running"react { msg ->println Thread.currentThread().getName()+": Recevier: I'vereceived a message: "+msgreply Thread.currentThread().getName()+": Ok"}println Thread.currentThread().getName()+": Receiver has finished"}
然后,我们再次使用actors()方法创建另一个Actor。在这种情况下,我们使用send()方法向另一个Actor发送一条消息,其中还包含了在Actor收到消息时react()方法要执行的代码。它将在控制台中输出消息。
def sender = Actors.actor {println Thread.currentThread().getName()+": Sender is running"receiver.send Thread.currentThread().getName()+": From sender toreceiver"react { msg ->println Thread.currentThread().getName()+": Sender: The responsehas arrived: "+msg}println Thread.currentThread().getName()+": Sender has finished"}
正如之前解释的那样,两个Actor都将立即开始执行。最后,在main()方法中,我们使用join()方法等待两个线程结束。
sender.join();receiver.join();}}
下面的屏幕截图显示了执行本例后的输出结果。

你可以看到发送方发送的消息如何到达接收方,而接收方如何将应答发送到发送方。
第二个示例是生产者/消费者问题的实现。首先,我们将实现消费者类。创建一个名为Consumer的类,并且指定它实现DefaultActor类。
import groovyx.gpars.actor.Actorimport groovyx.gpars.actor.DefaultActorclass Consumer extends DefaultActor {
然后,实现包含Actor主代码的act()方法。我们使用循环语句处理所有消息和react()方法,该方法将在Actor接收到的每条消息上调用。我们将参数5000传递给react()方法。如果Actor等待了5秒钟却没有收到消息,那么它将抛出一个超时错误并结束其执行。对于每条消息,我们只在控制台上输出关于该消息和发送方的信息。
void act() {loop {react(5000) { msg ->println "*****************************";println "Thread Name: "+Thread.currentThread().getName();println "Sender: "+sender.remoteClass;println "Message: "+msg;println "*****************************";}}}
然后,我们执行Actor的一些生命周期方法,以便在控制台中输出有关这些事件的信息。
void afterStart() {println "Consumer: After Start";}void afterStop(List undeliveredMessages) {println "Consumer: After Stop";println "Undelivered Messages: "+undeliveredMessages.size()}void onInterrupt(InterruptedException e) {println "Consumer: Interrupted"e.printStackTrace()terminate()}void onTimeout() {println "Consumer: Timeout"terminate()}void onException(Throwable e) {println "Consumer: An exception has ocurred"e.printStackTrace()}}
现在该实现生产者类了。创建一个名为Producer的类,并且指定它实现DefaultActor类。该类有两个属性,名称分别是发送消息的Producer和Consumer,并且使用构造函数来初始化它们。
import java.lang.invoke.AbstractValidatingLambdaMetafactoryimport java.util.Listimport groovyx.gpars.actor.DefaultActorimport groovyx.gpars.actor.Actorclass Producer extends DefaultActor {private Actor consumer;private String name;def Producer (Actor consumer, String name) {this.consumer = consumerthis.name = name}
现在,使用Actor的主代码实现act()方法。它将向消费者发送100条消息并结束执行。
void act() {def i;for (i = 0; i<100; i++) {def msg = Thread.currentThread().getName()msg+= ": "+namemsg+= ": Message "+i;consumer.send msg;Thread.currentThread().sleep(500);}}
最后,我们编写afterStop()方法的代码,该方法在控制台输出一条消息。
void afterStop(List undeliveredMessages) {println name+": After Stop";}}
现在,创建一个名为Example8的类,其中含有main()方法。
import groovyx.gpars.actor.Actorimport groovyx.gpars.actor.DefaultActorclass Example8 {static main(args) {Consumer consumer = new Consumer();consumer.start();Producer producer1 = new Producer(consumer,"Producer 1");Producer producer2 = new Producer(consumer, "Producer 2");producer1.start();Thread.currentThread().sleep(300);producer2.start();consumer.join();println "Main end"}}
在main()方法中,我们创建一个消费者和两个生产者,并且使用start()方法启动三个Actor。我们使用join()方法等待消费者Actor结束。在生产者发送TimeOut异常5秒钟后,该Actor将会结束。
下面的屏幕截图显示了执行该例时的输出结果。

你可以看到生产者如何结束其执行并且输出afterStop()方法的消息。然后,消费者出现一个TimeOut异常,并且执行onTimeOut()和afterStop()方法。然后,主程序结束其执行。
最后一个关于Actor的示例将向你展示如何使用无状态Actor。首先,创建一个名为Event的类,该类有两个属性:一个名为msg的String属性和一个名为date的Date属性。
class Event {String msg;Date date;@Overridepublic String toString() {return "Event: "+msg+": on "+date;}}
现在,创建一个名为Logger的类,并且指定它扩展DynamicDispatchActor类。我们实现onMessage()方法的三个版本,它们分别接收Event对象、String对象和Exception对象作为参数。我们在控制台中仅输出有关它收到的消息的信息。
class Logger extends DynamicDispatchActor {def onMessage (Event event) {println "Logger: "+Thread.currentThread().getName()+": "+event;replyIfExists "Logger: Event received";}def onMessage(String msg) {println "Logger: "+Thread.currentThread().getName()+": Direct mgs: "+msg;replyIfExists "Logger: Direct msg received";}def onMessage(Exception e) {println "Logger: "+Thread.currentThread().getName()+": Error:"+e.getLocalizedMessage();replyIfExists "Logger: Error received"}}
最后,创建一个名为Example9的类以及main()方法。首先,创建一个Logger类的Actor,并使用start()方法启动其执行。
import groovyx.gpars.actor.Actorimport groovyx.gpars.actor.Actorsclass Example9 {static main(args) {Logger logger = new Logger();logger.start();
然后,使用Actors类的actor()方法创建另一个Actor。在代码中,我们向Logger类发送三个消息(每种类型一个),并且包含用于处理三个响应的代码。
def tester = Actors.actor {println "Tester: "+Thread.currentThread().getName()+": is running"loop(3) {react(1000) { msg ->println "Tester: "+Thread.currentThread().getName()+": I've received a message: "+msg}}Event event = new Event()event.msg = "I'm an event"event.date = new Date()logger.send eventlogger.send "I'm a message"Exception e = new Exception("I'm an exception")logger.send e;println "Tester: "+Thread.currentThread().getName()+": Tester has finished"}
最后,使用join()方法等待tester Actor结束,使用stop()方法停止logger Actor,并且使用join()方法等待其最终结束。
tester.join();logger.stop();logger.join();println "Main End"}}
下面的屏幕截图展示了执行本例后的输出结果。

13.3.5 Agent
Agent保护可变数据对象,使之可以在线程之间安全地共享。Agent接受消息并以异步方式处理它们。消息是可以在Agent内部执行的函数或Groovy闭包。函数的返回值或闭包将成为Agent的新值/状态。函数或闭包将Agent的当前值/状态作为参数。
我们发送给Agent的命令是按顺序存储的,而且是一个接一个地处理,因此不会出现任何竞争条件。
要创建Agent,需要创建Agent类的一个新对象,用Agent中存储的取值类型对Agent类进行参数化。
当你使用Agent时,通常使用以下方法。
send():该方法向Agent发送一个命令。addListener():该方法添加一个监听器,每当Agent的值发生变化时就会得到通知。addValidator():该方法添加一个类似于监听器的验证器,但是可以拒绝对抛出异常的Agent的值做出更改。
让我们实现一个示例,看看如何使用Agent。首先,创建一个名为Account的类,它含有一个名为value的内部整型属性,一个名为increment()的方法(用于递增value属性的值),一个名为decrement()的方法(用于递减value属性的值),以及返回value属性值的方法。
class Account {private int value = 0;def void increment (int amount) {println "Account.increment: "+Thread.currentThread().getName()+": "+amount;value+=amount}def void decrement (int amount) {println "Account.decrement: "+Thread.currentThread().getName()+": "+amount;value-=amount}def int getValue() {return value;}}
然后,创建一个名为Example10的类以及main()方法。创建一个存储Account对象的新Agent。
class Example10 {static main(args) {Agent agent = new Agent<Account>(new Account())
然后,创建一个Actor,对该Agent的Account对象调用100次increment()方法。你可以使用it变量来访问该Agent的当前值。
def incrementer = Actors.actor {for (def i=0; i<100; i++) {agent.send {it.increment(1000)}}}
现在,创建另一个Actor。该Actor将对存储在该Agent中的Account对象调用99次decrement()方法。
def decrementer = Actors.actor {for (def i=0; i<99; i++) {agent.send {it.decrement(1000)}}}
最后,等待两个Actor执行结束并且输出该Agent的最终值。
incrementer.join()decrementer.join()println "Final value: "+agent.val.getValue()}}
如果执行该示例,你将会看到结果为1000(100次递增操作和99次递减操作)。
13.3.6 Dataflow
Dataflow为生产者和消费者之间共享数据提供了安全通道。Dataflow最基本的元素是Dataflow变量。你只需创建一个Dataflows类的对象,然后我们可以在其之上定义变量。这些变量有两个重要特征。
- 只能设置一次值。
- 当一个任务试图使用Dataflow变量的值时,它的执行线程将被阻塞,直到该变量有值为止。
使用Dataflow变量可以获得以下好处。
- 没有竞争条件。
- 不需要显式地使用锁或其他同步机制。
- 如果存在由Dataflow变量引发的死锁,可以确定其原因。
我们来看一个使用Dataflow变量的例子。首先,创建一个名为Example1的类,其中含有main()方法。
import static groovyx.gpars.dataflow.Dataflow.task;import java.util.concurrent.TimeUnitimport groovyx.gpars.dataflow.Dataflows;class Example11 {static main(args) {
现在,创建一个Dataflows对象和一个Date对象(其中含有该方法启动执行的日期)。
def store = new Dataflows()def mainStart = new Date();println "Main: Start "+mainStart
现在,启动一个逻辑任务,它将由另一个使用task函数的线程执行。将其执行线程休眠1秒钟,然后在Dataflows对象中创建一个变量,并且为其赋值为3。
task {TimeUnit.SECONDS.sleep(1)store.x = 3}
现在,创建另一个与前述任务类似的任务。我们将执行线程休眠2秒钟,然后将一个名为y的变量指派给它,其值为4。
task {TimeUnit.SECONDS.sleep(2)store.y = 4}
然后,创建第三个任务,该任务将计算变量x和y之间的和,并且将该值存储在另一个名为z的Dataflow变量中。
task {def start = new Date()println "Calculus Task: "+startstore.z = store.x + store.ydef end = new Date()println "Calculus Task: "+end}
最后,在main()方法中输出变量z的值。
println "Main: The final result is: "+store.zprintln "Main: End"}}
下面的屏幕截图显示了执行这个例子后的输出结果。

我们还可以创建一个DataflowVariable类的对象,并且使用<<运算符给它赋值。例如,创建一个名为Example13的类以及main()方法,并且创建一个名为data的DataflowVariable类的对象。
import static groovyx.gpars.dataflow.Dataflow.task;import java.util.concurrent.TimeUnitimport groovyx.gpars.dataflow.DataflowVariable;class Example13 {static main(args) {def data = new DataflowVariable()
现在,创建一个任务将其执行线程休眠2秒钟,并且使用<<运算符将值2赋给该变量。
task {println Thread.currentThread().getName()+": Wait two seconds toset the value"TimeUnit.SECONDS.sleep(2);data << 2;}
最后,在main()方法中包含一个输出data变量取值的语句。
println Thread.currentThread().getName()+" : Bind handler : "+data.val;}}
当你执行该示例时,将看到任务所输出的消息,以及两秒之后由main()方法所输出的消息,其中含有DataflowVariable对象的值。
Dataflow提供的另一个元素是Dataflow广播。它允许我们在生产者和消费者之间发送数据,就像在它们之间存在一个队列一样。它提供了一种发布—订阅机制,以便支持多个生产者与一个或多个消费者交互的情况。
下面来看看这种机制是如何运作的。首先,创建一个名为Producer的类。它有两个私有属性:一个名为broadcast的DataflowBroadcast对象和一个名为name的String对象。使用该类的构造函数来初始化这两个属性。
import java.util.concurrent.TimeUnitimport groovyx.gpars.dataflow.DataflowBroadcast;class Producer {private DataflowBroadcast broadcastprivate String namepublic Producer (DataflowBroadcast broadcast, String name) {this.broadcast = broadcastthis.name = name}
现在,实现一个名为execute()的方法。在该方法中,使用<<运算符将100个String对象写入broadcast对象。在每条消息之间,将执行线程休眠500毫秒。
public void execute() {for (int i=0; i<100; i++) {def msg = name + " MSG "+i+" : "+new Date();broadcast << msgTimeUnit.MILLISECONDS.sleep(500);}}}
现在,创建一个名为Consumer的类。该类将和Producer类具有相同的属性。使用该类的构造函数来初始化这两个属性。
import groovyx.gpars.dataflow.DataflowBroadcastimport groovyx.gpars.dataflow.DataflowReadChannelclass Consumer {private DataflowBroadcast broadcastprivate String namepublic Consumer (DataflowBroadcast broadcast, String name) {this.broadcast = broadcastthis.name = name}
现在,实现execute()方法。首先,创建一个DataflowReadChannel类的对象,读取来自DataflowBroadcast的值。然后,使用val函数编写200条消息。该函数将会休眠当前线程,直到DataflowBroadcast中的新数据可用。
public void execute() {DataflowReadChannel stream = broadcast.createReadChannel()for (int i=0; i<200; i++) {println "Consumer "+name+": "+stream.val}}}
我们有了生产者和消费者。现在该让它们工作了。创建一个名为Example12的类,其中含有main()方法。创建一个DataflowBroadcast对象、两个生产者和三个消费者。创建一个线程来执行每个生产者和每个消费者。然后,使用join()方法等待它们结束。
import groovyx.gpars.dataflow.DataflowBroadcastimport static groovyx.gpars.dataflow.Dataflow.taskclass Example12 {static main(args) {DataflowBroadcast dataflow = new DataflowBroadcast()def producer1, producer2, consumer1, consumer2, consumer3Thread thread1 = Thread.start {producer1 = new Producer(dataflow, "Producer 1")producer1.execute()}Thread thread2 = Thread.start {producer2 = new Producer(dataflow, "Producer 2")producer2.execute()}Thread thread3 = Thread.start{consumer1 = new Consumer(dataflow, "Consumer 1")consumer1.execute()}Thread thread4 = Thread.start {consumer2 = new Consumer(dataflow, "Consumer 2")consumer2.execute()}Thread thread5 = Thread.start {consumer3 = new Consumer(dataflow, "Consumer 3")consumer3.execute()}thread1.join()thread2.join()thread3.join()thread4.join()thread5.join()println "Main: end"}}
下面的屏幕截图显示了执行本例后的输出结果。

可以看到由生产者生成的每条消息如何到达三个消费者。
Dataflow提供的另一个功能是使用select()函数从多个通道中选择一个值。该函数接收一个通道列表作为参数,它将从全部含有可读值的通道中选择一个。该函数返回一个SelectResult对象,其中含有返回的值和它所选通道的信息。这种机制也是可配置的,例如,对某些渠道进行优先级排序。
我们来看看这种机制是如何运作的。首先,创建一个名为Example14的类,其中含有main()方法。创建名为source1、source2和source3的三个DataflowVariable对象。
import static groovyx.gpars.dataflow.Dataflow.taskimport static groovyx.gpars.dataflow.Dataflow.selectimport java.util.concurrent.TimeUnitimport groovyx.gpars.dataflow.DataflowVariableclass Example14 {static main(args) {def source1 = new DataflowVariable()def source2 = new DataflowVariable()def source3 = new DataflowVariable()
现在,创建三个任务来为每个数据源提供一个值。每个任务在为其DataflowVariable对象赋值之前,会将其执行线程休眠不同的时间。
task {TimeUnit.SECONDS.sleep(3);source1 << "source1"}task {TimeUnit.SECONDS.sleep(5);source2 << "source2"}task {TimeUnit.SECONDS.sleep(1);source3 << "source3"}
现在,使用select函数从这些数据源获取值,并且将其输出到控制台。
def result = select([source1, source2, source3])println "Main: "+result.select()}}
下面的屏幕截图显示了执行该例后的输出结果。

在本例中,source3对象首先得到值,因此select函数将在一秒钟之后返回它。
我们要分析的最后一种Dataflow机制是运算符。运算符从输入通道接收值,并且生成新值写入输出通道。所有这些通道都是Dataflow的变量。运算符将等待所有输入通道,直到它开始执行为止。
我们来看看这种机制是如何运行的。创建一个名为Example15的类,其中含有main()方法。创建名为a、b、c、d的四个DataflowVariable对象。
import groovyx.gpars.dataflow.DataflowVariable;import static groovyx.gpars.dataflow.Dataflow.operator;import java.util.concurrent.TimeUnitclass Example15 {static main(args) {def a = new DataflowVariable();def b = new DataflowVariable();def c = new DataflowVariable();def d = new DataflowVariable();
现在使用operator命令创建一个名为op的新运算符。它接收三个输入,即Dataflow变量a、b和c,并且返回Dataflow变量d的值。我们使用bindOutput函数来确定输出的值。
def op = operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->println "Operator"bindOutput 0, x + y + z}
最后,为变量a、b和c赋值,并且使用变量d的val属性将DataflowVariable的值输出到控制台。
a << 3;b << 5;c << 7;println "Main: "+d.val}}
当我们将值赋给三个DataflowVariable对象时,运算符执行其代码。当其完成之后,DataflowVariable d就有了值,并且在main()方法的最后一条语句中输出。
下面的屏幕截图显示了执行本例后的输出结果。

