13.1 Clojure的并发处理
Clojure是一种动态、通用的函数式编程语言,它基于Rich Hickey创建的Lisp编程语言。可在Clojure官网下载该语言的最新版本(撰写本书时是1.8.0版),还可以找到有关使用Clojure进行编程的文档和指南。你可以在最流行的Java IDE(如Eclipse)中安装Clojure支持环境。另一个有用的网页是http://clojure-doc.org,可以在上面找到社区驱动的Clojure编程语言文档站点。
本节将介绍Clojure编程语言中最重要的并发元素及其用法。本章不打算介绍Clojure编程语言,读者可以查看相关评论网站,学习如何使用Clojure编程。
Clojure编程语言的设计目标之一是使并发编程更加容易。针对这一目标,要注意两个重要事项。
- Clojure数据结构是不可变的,所以它们可以在线程之间共享而不会有任何问题。稍后你就会看到,这并不意味着并发应用程序中不能拥有可变值。
- Clojure将标识和值的概念区分开来,几乎消除了对显式锁的需要。
下面介绍一下Clojure编程语言提供的最重要的并发结构。
13.1.1 使用Java元素
使用Clojure编程时,可以使用所有的Java元素(包括并发元素),因此可以创建线程或执行器,或者使用Fork/Join框架。然而这并不是一种好的实践方法,因为Clojure本身提供了更简单的并发编程,但是可以显式地创建一个`Thread,如以下代码块所示:
(ns example.example1)(defn example1 ( [number](println (format "%s : %d"(Thread/currentThread) number))))(dotimes [i 10] (.start (Thread. (fn[] (example1 I)))))
在这段代码中,首先,定义一个名为example1的函数,它接收一个数值作为参数。在该函数内部,编写关于执行该函数的Thread的信息,以及参数中的number值。
然后,创建并执行10个Thread对象。每个线程都将调用函数example1。
在下面的截图中,可以看到这段代码的执行结果。

在上面的截图中,可以看到对于全部10个线程来说Thread的名称都是不同的。
13.1.2 引用类型
如前所述,Clojure数据结构不可变,但是Clojure提供了一些机制,允许使用引用类型处理可变变量。根据协调还是不协调,同步还是异步,可以对引用类型进行分类。
- 协调型:两个或多个操作相互协作时。
- 不协调型:该操作不对其他操作产生影响时。
- 同步型:调用者等待操作结束时。
- 异步型:调用者不等待操作结束时。
Clojure编程语言中最重要的引用类型有如下几种。
- Atom
- Agent
- Ref
下面一起了解一下如何使用这些元素。
- Atom对象
Atom本质上是对Java编程语言的原子引用。这种变量的变化对所有线程立即可见。我们将用下面的函数处理Atom,它们是一种不协调而且同步的引用类型。
atom:定义一个新的Atom对象。swap!:根据函数的结果,将Atom的值原子地更改为新值。它遵循(swap! atom function)格式,其中atom是Atom对象的名称,而function是返回Atom新值的函数。reset!:将Atom的值设置为新值。它遵循(reset! atom value)格式,其中atom是Atom对象的名称,而value是该Atom对象的新值。compare-and-set!:如果实际值与参数所传递的值相同,则以原子方式改变Atom对象的值。它遵循(compareand-set! atom old-value new-value)格式,其中atom是Atom对象的名称,old-value是Atom对象预期的实际值,而new-value是想要指派给Atom对象的新值。
下面介绍一个操作Atom对象的示例。首先,声明一个名为company的函数,它接收两个名为account和salary的参数。稍后将看到,account是一个Atom对象,而salary是一个数值。我们使用swap!函数增加account对象的值。然后,在控制台中输出执行该函数的线程信息,并使用@(dereferencing)函数来输出该Atom对象的实际值。
(ns example.example2)(defn company ( [account salary](swap! account + salary)(println (format "%s : %d"(Thread/currentThread) @account))))
然后,创建一个名为user的类似函数。它接收Atom对象account对象和另一个名为money的变量作为参数。我们仍然使用swap!函数,不过在这种情况下,是为了减小Atom对象的值。
(defn user ( [account money](swap! account - money)(println (format "%s : %d"(Thread/currentThread) @account))))
然后,创建一个名为myTask的函数,它接收一个名为account的Atom对象作为参数,并调用company函数1000次(值为100),而且user函数的值为100,这样account对象的最终值应该是相同的。
(defn myTask ( [account](dotimes [i 1000](company account 100)(user account 100)(Thread/sleep 100))))
最后,将myAccount对象创建为一个Atom对象(其初始值为0),并创建10个线程来执行myTask函数。
(def myAccount (atom 0))(dotimes [i 10] (.start (Thread. (fn[] (myTask myAccount)))))
下面的屏幕截图显示了这个例子的执行情况。

在该图中,可以看到运行myTask函数的不同线程,而且Atom对象myAccount的最终值如预期那样为0。
- Agent对象
Agent是在将来某个时刻异步更新的引用。它在整个生命周期中都与某个存储位置相关联,而你只能改变该位置的值。Agent是一种不协调的数据结构。
可以通过以下函数使用Agent。
agent:建立一个新的Agent对象。send:确定Agent的新值。它遵循(send agent function value)语法,其中agent是我们想修改的Agent的名称,function是为计算Agent新值所要执行的函数,而value是Agent的实际值,将其传递给function可以计算Agent的新值。send-of:当想要使用函数来更新一个阻塞型函数的值(例如,读取一个文件)时,可以使用该函数。send-of函数将立即返回,而且用于更新Agent值的函数将在另一个线程中继续执行。它遵循与send函数相同的语法。await:等待(阻塞当前线程),直到Agent所有未完成的操作完成为止。它遵循语法(await agent),其中agent是要等待的Agent的名称。await-for:对于实际的Agent,你可以使用该函数等待其参数指定的毫秒数。该函数返回一个布尔值,以指示Agent是否已被更新。它遵循语法(await-for time agent),其中agent是Agent的名称,而time是要等待的毫秒数。agent-error:如果Agent出现故障,则返回Agent抛出的异常。它遵循语法(agent-error agent),其中agent是Agent的名称。shutdown-agents:结束处于运行状态的Agent的执行。该函数遵循(shutdown-agents)语法。
下面来看一个例子,感受一下如何使用Agent。
首先,创建一个Agent,其初始值为300。
(ns example.example3)(def myAgent (agent 300))
然后,实现一个名为myTask的函数。我们将重复如下过程:首先使用send方法将Agent的值增加1000倍,然后用send方法将其递减,这样Agent的最终值就应该是相同的。
(defn myTask ( [a](dotimes [i 1000](send a + 100)(send a - 100)(println (format "%s : %d"(Thread/currentThread) @a))(Thread/sleep 100))))
最后,创建10个线程来执行myTask函数。
(dotimes [i 10] (.start (Thread. (fn[] (myTask myAgent)))))
下面的屏幕截图显示了执行本例时的输出。

在该屏幕截图中可以看到,有不同的线程执行myTask函数,而且Agent的值像预期那样最后达到300。
13.1.3 Ref对象
最后,来看看Ref对象。这类对象是Clojure中唯一的协调引用类型,也是一种同步数据结构。这类对象允许在事务处理中并发地修改多个引用,因此要么所有引用都被修改,要么任何一个引用都不被修改。
可以使用下述函数操作Ref对象。
ref:创建一个新的Ref对象。alter:该函数以安全方式修改引用值的取值。它遵循语法(alter ref function),其中,ref是要修改的Ref对象名称,而fucntion是用于获取该引用的新值的函数。ref-set:该集合确定了Ref对象的值。它遵循语法(ref-set ref value),其中ref是要修改的Ref对象的名称,而value是Ref对象的新值。conmute:该函数也可改变Ref的值,它遵循语法(conmute ref function),其中ref是想要修改的Ref对象的名称,而function则是计算Ref新值的函数。dosync:以事务处理的方式执行参数所传递的表达式。如果在表达式执行期间发生异常,则不会执行与Ref对象相关的操作。另一方面,alter函数和commuted函数都必须在dosync函数内部执行。它遵循语法(dosync expression),其中expression是待执行的表达式。
下面看一个操作Ref对象的例子。
首先,声明名为account1和account2的两个对象,并且将它们初始化为0。
(ns example.example4)(def account1 (ref 0))(def account2 (ref 0))
然后,定义一个名为myTask的函数,它将收名为source和destination的两个Ref对象作为参数。我们减小source的值并增加destination的值1000次,就像两个银行账户之间的交易一样。我们使用alter函数来改变Ref对象的值,因此在dosync函数中必须包含对它的两次调用。
(defn myTask ( [source, destination](dotimes [i 1000](dosync(alter source - 100)(alter destination + 100))(println (format "%s : %d - %d"(Thread/currentThread)@source @destination))(Thread/sleep 100))))
最后,创建10个线程来调用myTask函数,其中源是account1,目标是account2;再创建另外10个线程来调用myTask函数,其中源是account2,目标是account1。
(dotimes [i 10] (.start (Thread. (fn[] (myTask account1 account2)))))(dotimes [i 10] (.start (Thread. (fn[] (myTask account2 account1)))))
下面的屏幕截图显示了执行这个例子时的输出。

在该屏幕截图中,可以看到执行myTask函数的不同线程,以及两个引用的最终值像预想的那样为0。
13.1.4 Delay
Delay是一种数据结构,当其被解引用后才进行首次计算以获取值。可以使用下述函数操作Delay。
delay:使用该函数声明一个新的Delay。@:这是解引用函数。可以使用它读取Delay的值。realized?:该函数将返回一个布尔值,用于指示Delay是否已初始化。
下面来看一个关于Delay的例子。
首先,声明名为now、otherNow和later的三个对象。在这三个对象中,我们将存储一个含有当前日期的字符串。later对象将被定义为一个Delay。
(ns example.example5)(def now (.toString (java.util.Date.)))(def otherNow (.toString (java.util.Date.)))(def later (delay (.toString (java.util.Date.))))
然后,定义myTest函数。首先,输出now变量的值。然后,将当前线程休眠5秒钟,然后再输出otherNow变量和later变量的值。对于later变量,必须使用解引用函数获得它的值。
(defn myTest ([](println (format "%s" now))(Thread/sleep 5000)(println (format "%s : %s" otherNow @later))))(myTest)
下面的屏幕截图显示了执行这个例子时的输出结果。

在该屏幕截图中,可以看到Delay的值一直没有初始化,直到使用解引用函数后才获得值。
13.1.5 Future
Future是在另一个线程中计算的一段代码。可以使用下面的函数操作Future。
future:使用该函数创建一个新的Future。realized?:使用该函数可检验Future是否已执行完成。- 解引用函数(
@):使用该函数可获得Future的值。调用解引用函数阻塞当前线程,直到Future执行完成并返回值为止。 deref:使用该函数阻塞当前线程一段时间。如果该时间间隔结束后Future仍未完成执行,那么该函数返回。
下面看一个使用Future的例子。
首先,声明一个名为initializeEnv的函数,该函数让其执行线程休眠1秒钟。该函数输出关于执行这段代码的线程的信息,最后返回"Ok"值。
(ns example.example6)(def initializeEnv ( future(println (format "%s : Initializing environment"(Thread/currentThread)))(Thread/sleep 1000)(println (format "%s : Environment initialized"(Thread/currentThread)))"Ok"))
然后,声明另一个名为initilizeApp的函数。该函数与initializeEnv函数等价,只不过它将使执行线程休眠3秒钟。
(def initializeApp ( future(println "Initializing app")(Thread/sleep 3000)(println "Environment app")"Ok"))
最后,使用一组命令调用realized?函数和解引用函数。
(println (realized? initializeEnv))(println (realized? initializeApp))(println @initializeEnv)(println (realized? initializeEnv))(println (realized? initializeApp))(println @initializeApp)
执行该代码时,可以看到两个Future在同一时间启动执行,initializeEnv函数首先结束执行,而且initializeEnv将向realized?函数返回true值。然后,initilizeApp函数将结束其执行。
13.1.6 Promise
Promise是与Future相类似的一种机制。主要的区别在于它并不会计算某段代码;你要显式地确定它的值。可用于Promise的函数如下所示。
promise:使用该函数可创建一个新的Promise。realized?:使用该函数可以检查Promise是否有值。- 解引用函数(
@):使用该函数可以获取Promise的值。调用解引用函数阻塞当前线程,直到Promise完成其执行并且返回值为止。 deref:使用该函数来阻塞当前线程一段时间。如果这段时间结束且Promise尚未完成执行,则该函数返回。deliver:使用该函数来确定Promise的返回值。
让我们看一个使用Promise的例子。首先,定义一个名为myPromise的新Promise。
(ns example.example7)(def myPromise (promise))
然后,创建一个名为myTest的函数,它将接收一个Promise作为参数。等待5秒钟,然后在验证该Promise没有值之后,使用deliver函数为其确定值。
(defn myTest ([p](def now (java.util.Date.))(println (format "Start : %s" now))(Thread/sleep 5000)(def now (java.util.Date.))(println (format "End : %s" now))(println (realized? p))(deliver p "ok")))
最后,启动一个线程来执行myTest函数,并且使用realized?函数和解引用函数来验证Promise是否有值,并且将其输出。
(def now (java.util.Date.))(println (format "Main : %s" now))(println (realized? myPromise))(println @myPromise)(def now (java.util.Date.))(println (format "Main : %s" now))(println (realized? myPromise))
下面的屏幕截图展示了执行本例后的输出结果。

