12.7 用消息队列传输对象

    multiprocessing模块用于序列化和传输对象。我们可以用队列和管道序列化对象,然后传输到其他进程中。有许多第三方的项目都提供了完整的消息队列的实现。我们这里着重介绍multiprocess队列,因为它是Python内置的一个非常优秀的队列。

    对于要求高性能的应用程序,可能需要一个更快的消息队列。使用比 pickling 更快的序列化技术可能也是必需的。但是,在本章中,我们只关注 Python 的设计问题。multiprocessing模块基于pickle来编码对象。关于这方面的更多信息可以参见第9章“序列化和保存——JSON、YAML、Pickle、CSV和XML”。我们无法简单地提供一个带有限制的unpickler,这个模块为我们提供了一些相对简单的安全措施以防止unpickle问题。

    当使用multiprocessing时,有一个很重要的设计问题需要考虑:通常最好禁止多个进程(或者线程)同时更改共享对象。同步和锁的问题非常严重(并且很容易出错),所以有一个笑话是。

    当面对一个问题时,程序员会想,“我要用多线程。”

    通过RESTful的Web服务或者multiprocessing使用进程级别的同步可以避免同步问题,因为没有共享对象。基本的设计原则是将处理过程看作一个由许多离散的步骤组成的管道。每个处理步骤都有一个输入队列和一个输出队列,这个步骤会获取一个对象、执行一些处理过程,然后更新对象。

    multiprocessing的理念和POSIX中shell管道的概念类似,这种管道命令通常写作process1 | process2 | process3。这种shell管道涉及3个通过管道互相连接的并发进程。这种管道与multiprocessing的主要不同是,在使用multiprocessing时不需要使用STDIN、STDOUT并且显式地序列化对象。我们可以相信multiprocessing模块对操作系统级基础事务的处理。

    POSIX shell管道有一些限制,每个管道中只有一个生产者和一个消费者。Python的multiprocessing模块允许我们创建包含多个消费者的消息队列。这允许我们定义可以从源进程向多个sink进程发散的管道。队列能包含多个消费者让我们可以通过一个单独的sink进程创建一个可以对多个源进程结果进行合并的管道。

    为了最大化计算机系统的吞吐量,我们需要有足够的等待中的工作,这样就不会有空闲的处理器或者内核。当任意给定的操作系统进程在等待一个资源时,应该至少有另一个进程准备运行。

    例如,回到模拟游戏中,我们需要通过多次使用玩家策略或者下注策略(或者都使用)收集重要的模拟统计数据。这里的做法是创建一个处理请求的队列,这样我们计算机的处理器(和核心)就可以充分参与处理我们的模拟操作。

    每个处理中的请求都可以作为一个Python对象。multiprocessing模块会序列化对象,这样它就可以通过队列传输到另外一个进程中。

    在第14章“Logging和Warning模块”中,当我们介绍logging模块如何使用multi processing队列为每个生产者进程提供单一、集中的日志时,会再次讲解这点。在这些例子中,在进程间传递的对象是logging.LogRecord实例。

    12.7.1 定义进程

    我们必须把每个处理步骤设计成一个简单的循环,从队列中读取请求、处理请求,然后将结果传递给另外一个队列。这样的设计将很大的问题分解为若干阶段,这些阶段组成了一个管道。由于这些阶段会并发执行,因此我们可以最大化地利用系统资源。此外,由于这些阶段只包含简单的获取数据和将结果存入独立队列中的操作,因此不会再有复杂的锁或者共享对象带来的各种问题。一个进程可以是一个简单的函数或者一个可回调对象。我们会主要介绍进程定义为multiprocessing.Process的子类,这样的做法为我们提供了最大的灵活性。

    回到我们的模拟游戏,我们可以将模拟过程分解为3个步骤的管道。

    1.一个全局的驱动者将模拟请求放入处理队列中(processing queue)。

    2.处于模拟器池的模拟器会从处理队列中获取请求,执行模拟操作,然后将结果放入结果队列(results queue)中。

    3.摘要者(summarizer)会从结果队列中获取结果,然后创建一个最终结果的列表。

    使用进程池让我们能够并发执行CPU可以接受的最大数量的模拟操作,模拟器池可以进行适当配置,来确保模拟操作能以最快速度执行。

    以下是模拟器进程的定义。

    import multiprocessing
    class Simulation( multiprocessing.Process ):
       def init( self, setupqueue, resultqueue ):
         self.setupqueue= setupqueue
         self.resultqueue= resultqueue
         super().init()
       def run( self ):
         """Waits for a termination"""
         print( self.class.__name
    , "start" )
         item= self.setup_queue.get()
         while item != (None,None):
           table, player = item
           self.sim= Simulate( table, player, samples=1 )
           results= list( self.sim )
           self.result_queue.put( (table, player, results[0]) )
           item= self.setup_queue.get()
         print( self.__class
    .__name
    , "finish" )

    我们扩展了multiprocessing.Process。这意味着我们必须做两件事情确保能够和multiprocessing交互:我们必须保证super().init()会被执行,并且必须重载run()

    run()的方法体中,我们使用了两个队列。setup_queue队列实例会包含TablePlayer对象的两个元组。进程会利用这两个对象执行模拟操作。它会将生成的3个元素的元组作为结果放入result_queue队列实例中。Simulate类的API如下。

    class Simulate:
       def init( self, table, player, samples ):
       def iter( self ): yields summaries

    迭代器会根据samples请求的数量,依次返回对应的统计结果。我们还在setup_queue中包含了一个哨兵对象(sentinel object)。这个对象用来以一种优雅的方式终止当前的处理过程。如果不用哨兵对象,我们就会被迫终止进程,这有可能破坏锁和其他的系统资源。以下是Summarize进程的代码。

    class Summarize( multiprocessing.Process ):
       def init( self, queue ):
         self.queue= queue
         super().init()
       def run( self ):
         """Waits for a termination"""
         print( self.class.name, "start" )
         count= 0
         item= self.queue.get()
         while item != (None, None, None):
           print( item )
           count += 1
           item= self.queue.get()
         print( self.class.name, "finish", count )

    这个类也扩展了multiprocessing.Process。在本例中,我们从队列中获取对象,然后简单地计算它们的总数。一个更有用的进程可能会用一些collection.Counter对象来累加一些更有趣的统计数据。

    Simulation类一样,我们也会检测哨兵对象然后优雅地关闭处理进程。使用哨兵对象让我们在进程完成工作时及时地关闭处理过程。在一些应用程序中,子进程可能不会被关闭,并且会一致运行下去。

    12.7.2 创建队列和提供数据

    创建队列包括创建multiprocessing.Queue或者它的某个子类的实例。在本例中,我们可以用以下方式创建队列。

    setup_q= multiprocessing.SimpleQueue()
    results_q= multiprocessing.SimpleQueue()

    我们创建了两个定义处理管道的队列。当我们将一个模拟请求放入setup_q队列中时,我们预期Simulation进程会取出这个请求并且执行模拟过程。这个步骤应该生成由3个元素组成的元组作为结果,包括台面、玩家和results_q队列中的结果。这个三元组的结果应该反过来指定Summarize进程需要完成的工作。以下是我们如何开始一个Summarize进程。

    result= Summarize( results_q )
    result.start()

    下面是我们如何创建4个并发的模拟进程。

    simulators= []
    for i in range(4):
       sim= Simulation( setup_q, results_q )
       sim.start()
       simulators.append( sim )

    这4个并发的进程会竞争工作资源,每个都会试图从等待请求的队列中获取下一个请求。一旦这4个模拟器满载,未处理的请求会开始填充队列。一旦队列和进程都进入等待,驱动者函数就可以开始将请求存入setup_q队列中。以下代码中的循环会生成大量的请求。

    table= Table( decks= 6, limit= 50, dealer=Hit17(),
      split= ReSplit(), payout=(3,2) )
    for bet in Flat, Martingale, OneThreeTwoSix:
       player= Player( SomeStrategy, bet(), 100, 25 )
       for sample in range(5):
          setup_q.put( (table, player) )

    我们创建了一个Table对象,为3种下注策略各创建一个Player对象并且将一个模拟请求插入到队列中。Simulation对象会从队列中获取并处理pickled的双元组。为了能够有序地终止这4个模拟器,需要为每个模拟器定义哨兵对象并插入队列中。

    for sim in simulators:
       setup_q.put( (None,None) )

    for sim in simulators:
       sim.join()

    我们在队列中为每个模拟器各添加一个可以消费的哨兵的对象。一旦所有的模拟器都消费了哨兵对象,我们等待进程结束然后回到父进程中。

    一旦Process.join()操作结束,不会再创建任何模拟数据。我们可以在模拟操作的结果队列中也添加一个哨兵对象。

    results_q.put( (None,None,None) )
    result.join()

    一旦结果队列中的哨兵对象处理完成,Summarize进程会停止接受输入,这时我们可以使用join()将其返回父进程。

    我们用multiprocessing将对象从一个进程传输到另外一个进程,这让我们可以用一种相对简单的方法创建高性能、多重处理的数据管道。由于multiprocessing模块使用了pickle,因此对于对象行为的限制几乎无法通过管道实现。