13.1 Clojure的并发处理

Clojure是一种动态、通用的函数式编程语言,它基于Rich Hickey创建的Lisp编程语言。可在Clojure官网下载该语言的最新版本(撰写本书时是1.8.0版),还可以找到有关使用Clojure进行编程的文档和指南。你可以在最流行的Java IDE(如Eclipse)中安装Clojure支持环境。另一个有用的网页是http://clojure-doc.org,可以在上面找到社区驱动的Clojure编程语言文档站点。

本节将介绍Clojure编程语言中最重要的并发元素及其用法。本章不打算介绍Clojure编程语言,读者可以查看相关评论网站,学习如何使用Clojure编程。

Clojure编程语言的设计目标之一是使并发编程更加容易。针对这一目标,要注意两个重要事项。

  • Clojure数据结构是不可变的,所以它们可以在线程之间共享而不会有任何问题。稍后你就会看到,这并不意味着并发应用程序中不能拥有可变值。
  • Clojure将标识和值的概念区分开来,几乎消除了对显式锁的需要。

下面介绍一下Clojure编程语言提供的最重要的并发结构。

13.1.1 使用Java元素

使用Clojure编程时,可以使用所有的Java元素(包括并发元素),因此可以创建线程或执行器,或者使用Fork/Join框架。然而这并不是一种好的实践方法,因为Clojure本身提供了更简单的并发编程,但是可以显式地创建一个`Thread,如以下代码块所示:

  1. (ns example.example1)
  2. (defn example1 ( [number]
  3. (println (format "%s : %d"(Thread/currentThread) number))
  4. ))
  5. (dotimes [i 10] (.start (Thread. (fn[] (example1 I)))))

在这段代码中,首先,定义一个名为example1的函数,它接收一个数值作为参数。在该函数内部,编写关于执行该函数的Thread的信息,以及参数中的number值。

然后,创建并执行10个Thread对象。每个线程都将调用函数example1

在下面的截图中,可以看到这段代码的执行结果。

13.1 Clojure的并发处理 - 图1

在上面的截图中,可以看到对于全部10个线程来说Thread的名称都是不同的。

13.1.2 引用类型

如前所述,Clojure数据结构不可变,但是Clojure提供了一些机制,允许使用引用类型处理可变变量。根据协调还是不协调,同步还是异步,可以对引用类型进行分类。

  • 协调型:两个或多个操作相互协作时。
  • 不协调型:该操作不对其他操作产生影响时。
  • 同步型:调用者等待操作结束时。
  • 异步型:调用者不等待操作结束时。

Clojure编程语言中最重要的引用类型有如下几种。

  • Atom
  • Agent
  • Ref

下面一起了解一下如何使用这些元素。

  • Atom对象

Atom本质上是对Java编程语言的原子引用。这种变量的变化对所有线程立即可见。我们将用下面的函数处理Atom,它们是一种不协调而且同步的引用类型。

  • atom:定义一个新的Atom对象。
  • swap!:根据函数的结果,将Atom的值原子地更改为新值。它遵循(swap! atom function)格式,其中atom是Atom对象的名称,而function是返回Atom新值的函数。
  • reset!:将Atom的值设置为新值。它遵循(reset! atom value)格式,其中atom是Atom对象的名称,而value是该Atom对象的新值。
  • compare-and-set!:如果实际值与参数所传递的值相同,则以原子方式改变Atom对象的值。它遵循(compareand-set! atom old-value new-value)格式,其中atom是Atom对象的名称,old-value是Atom对象预期的实际值,而new-value是想要指派给Atom对象的新值。
    下面介绍一个操作Atom对象的示例。首先,声明一个名为company的函数,它接收两个名为accountsalary的参数。稍后将看到,account是一个Atom对象,而salary是一个数值。我们使用swap!函数增加account对象的值。然后,在控制台中输出执行该函数的线程信息,并使用@(dereferencing)函数来输出该Atom对象的实际值。
  1. (ns example.example2)
  2. (defn company ( [account salary]
  3. (swap! account + salary)
  4. (println (format "%s : %d"(Thread/currentThread) @account))
  5. ))

然后,创建一个名为user的类似函数。它接收Atom对象account对象和另一个名为money的变量作为参数。我们仍然使用swap!函数,不过在这种情况下,是为了减小Atom对象的值。

  1. (defn user ( [account money]
  2. (swap! account - money)
  3. (println (format "%s : %d"(Thread/currentThread) @account))
  4. ))

然后,创建一个名为myTask的函数,它接收一个名为account的Atom对象作为参数,并调用company函数1000次(值为100),而且user函数的值为100,这样account对象的最终值应该是相同的。

  1. (defn myTask ( [account]
  2. (dotimes [i 1000]
  3. (company account 100)
  4. (user account 100)
  5. (Thread/sleep 100)
  6. )))

最后,将myAccount对象创建为一个Atom对象(其初始值为0),并创建10个线程来执行myTask函数。

  1. (def myAccount (atom 0))
  2. (dotimes [i 10] (.start (Thread. (fn[] (myTask myAccount)))))

下面的屏幕截图显示了这个例子的执行情况。

13.1 Clojure的并发处理 - 图2

在该图中,可以看到运行myTask函数的不同线程,而且Atom对象myAccount的最终值如预期那样为0。

  • Agent对象

Agent是在将来某个时刻异步更新的引用。它在整个生命周期中都与某个存储位置相关联,而你只能改变该位置的值。Agent是一种不协调的数据结构。

可以通过以下函数使用Agent。

  • agent:建立一个新的Agent对象。
  • send:确定Agent的新值。它遵循(send agent function value)语法,其中agent是我们想修改的Agent的名称,function是为计算Agent新值所要执行的函数,而value是Agent的实际值,将其传递给function可以计算Agent的新值。
  • send-of:当想要使用函数来更新一个阻塞型函数的值(例如,读取一个文件)时,可以使用该函数。send-of函数将立即返回,而且用于更新Agent值的函数将在另一个线程中继续执行。它遵循与send函数相同的语法。
  • await:等待(阻塞当前线程),直到Agent所有未完成的操作完成为止。它遵循语法(await agent),其中agent是要等待的Agent的名称。
  • await-for:对于实际的Agent,你可以使用该函数等待其参数指定的毫秒数。该函数返回一个布尔值,以指示Agent是否已被更新。它遵循语法(await-for time agent),其中agent是Agent的名称,而time是要等待的毫秒数。
  • agent-error:如果Agent出现故障,则返回Agent抛出的异常。它遵循语法(agent-error agent),其中agent是Agent的名称。
  • shutdown-agents:结束处于运行状态的Agent的执行。该函数遵循(shutdown-agents)语法。
    下面来看一个例子,感受一下如何使用Agent。

首先,创建一个Agent,其初始值为300。

  1. (ns example.example3)
  2. (def myAgent (agent 300))

然后,实现一个名为myTask的函数。我们将重复如下过程:首先使用send方法将Agent的值增加1000倍,然后用send方法将其递减,这样Agent的最终值就应该是相同的。

  1. (defn myTask ( [a]
  2. (dotimes [i 1000]
  3. (send a + 100)
  4. (send a - 100)
  5. (println (format "%s : %d"(Thread/currentThread) @a))
  6. (Thread/sleep 100)
  7. )))

最后,创建10个线程来执行myTask函数。

  1. (dotimes [i 10] (.start (Thread. (fn[] (myTask myAgent)))))

下面的屏幕截图显示了执行本例时的输出。

13.1 Clojure的并发处理 - 图3

在该屏幕截图中可以看到,有不同的线程执行myTask函数,而且Agent的值像预期那样最后达到300。

13.1.3 Ref对象

最后,来看看Ref对象。这类对象是Clojure中唯一的协调引用类型,也是一种同步数据结构。这类对象允许在事务处理中并发地修改多个引用,因此要么所有引用都被修改,要么任何一个引用都不被修改。

可以使用下述函数操作Ref对象。

  • ref:创建一个新的Ref对象。
  • alter:该函数以安全方式修改引用值的取值。它遵循语法(alter ref function),其中,ref是要修改的Ref对象名称,而fucntion是用于获取该引用的新值的函数。
  • ref-set:该集合确定了Ref对象的值。它遵循语法(ref-set ref value),其中ref是要修改的Ref对象的名称,而value是Ref对象的新值。
  • conmute:该函数也可改变Ref的值,它遵循语法(conmute ref function),其中ref是想要修改的Ref对象的名称,而function则是计算Ref新值的函数。
  • dosync:以事务处理的方式执行参数所传递的表达式。如果在表达式执行期间发生异常,则不会执行与Ref对象相关的操作。另一方面,alter函数和commuted函数都必须在dosync函数内部执行。它遵循语法(dosync expression),其中expression是待执行的表达式。

下面看一个操作Ref对象的例子。

首先,声明名为account1account2的两个对象,并且将它们初始化为0。

  1. (ns example.example4)
  2. (def account1 (ref 0))
  3. (def account2 (ref 0))

然后,定义一个名为myTask的函数,它将收名为sourcedestination的两个Ref对象作为参数。我们减小source的值并增加destination的值1000次,就像两个银行账户之间的交易一样。我们使用alter函数来改变Ref对象的值,因此在dosync函数中必须包含对它的两次调用。

  1. (defn myTask ( [source, destination]
  2. (dotimes [i 1000]
  3. (dosync
  4. (alter source - 100)
  5. (alter destination + 100)
  6. )
  7. (println (format "%s : %d - %d"(Thread/currentThread)
  8. @source @destination))
  9. (Thread/sleep 100)
  10. )))

最后,创建10个线程来调用myTask函数,其中源是account1,目标是account2;再创建另外10个线程来调用myTask函数,其中源是account2,目标是account1

  1. (dotimes [i 10] (.start (Thread. (fn[] (myTask account1 account2)))))
  2. (dotimes [i 10] (.start (Thread. (fn[] (myTask account2 account1)))))

下面的屏幕截图显示了执行这个例子时的输出。

13.1 Clojure的并发处理 - 图4

在该屏幕截图中,可以看到执行myTask函数的不同线程,以及两个引用的最终值像预想的那样为0。

13.1.4 Delay

Delay是一种数据结构,当其被解引用后才进行首次计算以获取值。可以使用下述函数操作Delay。

  • delay:使用该函数声明一个新的Delay。
  • @:这是解引用函数。可以使用它读取Delay的值。
  • realized?:该函数将返回一个布尔值,用于指示Delay是否已初始化。

下面来看一个关于Delay的例子。

首先,声明名为nowotherNowlater的三个对象。在这三个对象中,我们将存储一个含有当前日期的字符串。later对象将被定义为一个Delay。

  1. (ns example.example5)
  2. (def now (.toString (java.util.Date.)))
  3. (def otherNow (.toString (java.util.Date.)))
  4. (def later (delay (.toString (java.util.Date.))))

然后,定义myTest函数。首先,输出now变量的值。然后,将当前线程休眠5秒钟,然后再输出otherNow变量和later变量的值。对于later变量,必须使用解引用函数获得它的值。

  1. (defn myTest ([]
  2. (println (format "%s" now))
  3. (Thread/sleep 5000)
  4. (println (format "%s : %s" otherNow @later))
  5. ))
  6. (myTest)

下面的屏幕截图显示了执行这个例子时的输出结果。

13.1 Clojure的并发处理 - 图5

在该屏幕截图中,可以看到Delay的值一直没有初始化,直到使用解引用函数后才获得值。

13.1.5 Future

Future是在另一个线程中计算的一段代码。可以使用下面的函数操作Future。

  • future:使用该函数创建一个新的Future。
  • realized?:使用该函数可检验Future是否已执行完成。
  • 解引用函数(@):使用该函数可获得Future的值。调用解引用函数阻塞当前线程,直到Future执行完成并返回值为止。
  • deref:使用该函数阻塞当前线程一段时间。如果该时间间隔结束后Future仍未完成执行,那么该函数返回。

下面看一个使用Future的例子。

首先,声明一个名为initializeEnv的函数,该函数让其执行线程休眠1秒钟。该函数输出关于执行这段代码的线程的信息,最后返回"Ok"值。

  1. (ns example.example6)
  2. (def initializeEnv ( future
  3. (println (format "%s : Initializing environment"(Thread/currentThread)))
  4. (Thread/sleep 1000)
  5. (println (format "%s : Environment initialized"(Thread/currentThread)))
  6. "Ok"
  7. ))

然后,声明另一个名为initilizeApp的函数。该函数与initializeEnv函数等价,只不过它将使执行线程休眠3秒钟。

  1. (def initializeApp ( future
  2. (println "Initializing app")
  3. (Thread/sleep 3000)
  4. (println "Environment app")
  5. "Ok"
  6. ))

最后,使用一组命令调用realized?函数和解引用函数。

  1. (println (realized? initializeEnv))
  2. (println (realized? initializeApp))
  3. (println @initializeEnv)
  4. (println (realized? initializeEnv))
  5. (println (realized? initializeApp))
  6. (println @initializeApp)

执行该代码时,可以看到两个Future在同一时间启动执行,initializeEnv函数首先结束执行,而且initializeEnv将向realized?函数返回true值。然后,initilizeApp函数将结束其执行。

13.1.6 Promise

Promise是与Future相类似的一种机制。主要的区别在于它并不会计算某段代码;你要显式地确定它的值。可用于Promise的函数如下所示。

  • promise:使用该函数可创建一个新的Promise。
  • realized?:使用该函数可以检查Promise是否有值。
  • 解引用函数(@):使用该函数可以获取Promise的值。调用解引用函数阻塞当前线程,直到Promise完成其执行并且返回值为止。
  • deref:使用该函数来阻塞当前线程一段时间。如果这段时间结束且Promise尚未完成执行,则该函数返回。
  • deliver:使用该函数来确定Promise的返回值。

让我们看一个使用Promise的例子。首先,定义一个名为myPromise的新Promise。

  1. (ns example.example7)
  2. (def myPromise (promise))

然后,创建一个名为myTest的函数,它将接收一个Promise作为参数。等待5秒钟,然后在验证该Promise没有值之后,使用deliver函数为其确定值。

  1. (defn myTest ([p]
  2. (def now (java.util.Date.))
  3. (println (format "Start : %s" now))
  4. (Thread/sleep 5000)
  5. (def now (java.util.Date.))
  6. (println (format "End : %s" now))
  7. (println (realized? p))
  8. (deliver p "ok")
  9. ))

最后,启动一个线程来执行myTest函数,并且使用realized?函数和解引用函数来验证Promise是否有值,并且将其输出。

  1. (def now (java.util.Date.))
  2. (println (format "Main : %s" now))
  3. (println (realized? myPromise))
  4. (println @myPromise)
  5. (def now (java.util.Date.))
  6. (println (format "Main : %s" now))
  7. (println (realized? myPromise))

下面的屏幕截图展示了执行本例后的输出结果。

13.1 Clojure的并发处理 - 图6