14.5 高级日志——最后一些信息和网络目标地址
我们会介绍两种可以帮助提供更有用的调试信息的更高级技术。第 1 种技术是log tail:,这是一些重大事件前最后几个日志消息的缓冲区,其目的是用一个小文件查看应用程序终止前的最后几条日志消息。这有点像将OS的tail命令自动应用到完整的日志输出中。
第2种技术是用日志框架提供的功能将日志信息通过网络发送给集中的日志处理服务。这种技术可以用来整合来自不用服务器的日志信息。我们需要为日志创建发送者和接收者。
14.5.1 创建自动的tail缓冲区
log tail缓冲区是logging框架的一个扩展。我们会通过扩展MemoryHandler略微改变它的行为。MemoryHandler内置的行为包括3种写入模式:当缓冲区满时,写入到另一个handler;当logging关闭时,写入所有缓冲的消息;更重要的是,当某个给定级别的消息被记录时,写入整个缓冲区。
我们会稍微修改第1种写入模式。我们会将最老的信息从缓冲区中移除,其他的信息仍然会留在缓冲区中,而不是当缓冲区满时写入到另外一个handler中,其他两种写入模式保持不变。这样做的结果是会写入logging关闭之前和错误发生之前的一些信息。
我们通常在一个更高级别的错误信息被记录前,将内存处理程序配置为一直缓冲消息的状态。这样做的目的是为了当错误发生时会写入缓冲区中的信息。
为了能够理解这个例子,重要的是定位到Python的安装位置,详细地理解logging.handlers模块。
基于TailHandler类创建时定义的容量,MemoryHandler的这个扩展会保存最后的几条信息。
class TailHandler(logging.handlers.MemoryHandler):
def shouldFlush(self, record):
"""
Check for buffer full or a record at the flushLevel or higher.
"""
if record.levelno >= self.flushLevel: return True
while len(self.buffer) >= self.capacity:
self.acquire()
try:
del self.buffer[0]
finally:
self.release()
我们扩展了MemoryHandler,它会累积日志消息直到超出设定的容量。当达到最大容量时,添加新消息时会移除旧消息。请注意,我们必须锁定数据结构允许多线程访问日志。
如果收到一条特定级别的消息,那么整个结构都会写入到目标处理程序中。通常,这个目标处理程序是FileHandler,为了调试需要和技术支持,这个处理程序会将消息写入文件的尾部。
另外,当logging关闭时,最后的一些消息也会被写入文件尾部。这意味着程序正常结束,不需要调试或者技术支持。
通常,我们会向这种类型的处理程序发送一条DEBUG消息,这样就可以得到很多程序崩溃的细节。在配置过程中应该显示地将级别设置为DEBUG,而不是使用默认的级别。
下面是一个使用TailHandler的配置。
version: 1
disableexistingloggers: False
handlers:
console:
class: logging.StreamHandler
stream: ext://sys.stderr
formatter: basic
tail:
(): __main.TailHandler
target: cfg://handlers.console
capacity: 5
formatters:
basic:
style: "{"
format: "{levelname:s}:{name:s}:{message:s}"
loggers:
test:
handlers: [tail]
level: DEBUG
propagate: False
root:
handlers: [console]
level: INFO
TailHandler的定义向我们展示了logging其他的一些可配置功能,它向我们展示了在配置文件中使用类引用和其他元素。
在配置中,我们引用了一个自定义类。前面配置文件中的cgf://handlers.console文本引用了定义在配置文件的handlers部分中的console处理程序。出于演示目的,我们让tail的target指向的StreamHandler使用sys.stderr。如前所述,另外一种可能的设计是使用指向调试文件的FileHandler。
我们为使用tail处理程序的日志记录器创建了test层次结构。写入到这些日志记录器中的消息会被缓存,只有在发生错误或者logging关闭时才会显示。
下面是一个演示脚本。
logging.config.dictConfig( yaml.load(config8) )
log= logging.getLogger( "test.demo8" )
print( "Last 5 before error" )
for i in range(20):
log.debug( "Message {:d}".format(i) )
log.error( "Error causes dump of last 5" )
print( "Last 5 before shutdown" )
for i in range(20,40):
log.debug( "Message {:d}".format(i) )
logging.shutdown()
在错误发生前,我们生成了20条消息。然后,在关闭logging和刷新缓存区前我们又生成了20条消息。这段脚本会产生类似下面这样的输出。
Last 5 before error
DEBUG:test.demo8:Message 16
DEBUG:test.demo8:Message 17
DEBUG:test.demo8:Message 18
DEBUG:test.demo8:Message 19
ERROR:test.demo8:Error causes dump of last 5
Last 5 before shutdown
DEBUG:test.demo8:Message 36
DEBUG:test.demo8:Message 37
DEBUG:test.demo8:Message 38
DEBUG:test.demo8:Message 39
tail处理程序默认忽略中介消息。由于设置的最大容量是5,因此错误发生(或者关闭logging)之前的最后5条消息会被显示。
14.5.2 发送日志消息到远程的进程
一种高性能的设计模式是用多个进程处理一个单一的问题。我们的应用程序可能部署在多个应用程序服务器上或者是使用多个数据库客户端。对于这种架构,我们通常希望为不同进程提供集中的日志处理机制。
用来创建统一日志的一种技术是包含准确的时间戳,然后将来自多个日志文件的记录整理为一个统一的日志。通过远程地从多个并发的生产者进程将日志记录到一个消费者进程,就可以避免分类和整合这些额外的处理过程。
我们的共享日志解决方案会使用multiprocessing的共享日志。更多关于多进程的信息,参见第12章“传输和共享对象”。
建立多进程应用程序需要3个步骤。
- 首先,创建共享队列对象,这样一来日志消费者就可以筛选消息。
- 其次,创建从队列中获取日志记录的消费者进程。
- 最后,我们会创建用于处理真正应用程序工作的生产者进程池,这些进程会将记录写入共享队列。
ERROR和FATAL消息可以通过SMS和E-mail向关心它们的用户提供即时通知。消费者也可以以相对慢的速度处理和整合日志文件有关的工作。
创建生产者和消费者的顶层父应用程序和Linux中启动多个OS级别进程的init程序类似。如果我们使用和init一样的设计模式,那么父应用程序可以监视多个子生产者程序,观察它们是否崩溃。同时,它也可以记录相关的错误甚至尝试重启程序。
下面是消费者进程的定义。
import collections
import logging
import multiprocessing
class LogConsumer1(multiprocessing.Process):
"""In effect, an instance of QueueListener."""
def init( self, queue ):
self.source= queue
super().init()
logging.config.dictConfig( yaml.load(consumerconfig) )
self.combined= logging.getLogger(
"combined." + self.class.qualname )
self.log= logging.getLogger( self._class.__qualname )
self.counts= collections.Counter()
def run( self ):
self.log.info( "Consumer Started" )
while True:
log_record= self.source.get()
if log_record == None: break
self.combined.handle( log_record )
words= log_record.getMessage().split()
self.counts[words[1]] += 1
self.log.info( "Consumer Finished" )
self.log.info( self.counts )
这个进程是multiprocessing.Process的子类,我们会用start()方法启动它,基类会启动一个子进程运行run()方法。
当进程运行时,它会从队列中获取日志记录并将它们转发给一个日志记录器实例。在本例中,我们会创建一个特殊的日志记录器,它会以父类的名字combined.命名,源进程中的所有记录都会写入这个日志记录器。
另外,基于每条信息的第2个单词,我们会提供一些计数信息。在本例中,我们已经将信息文本中的第2个单词设计为进程的ID号码。这个计数会向我们展示有多少条信息被成功处理过。
下面是一个为这个过程创建的配置文件。
version: 1
disable_existing_loggers: False
handlers:
console:
class: logging.StreamHandler
stream: ext://sys.stderr
formatter: basic
formatters:
basic:
style: "{"
format: "{levelname:s}:{name:s}:{message:s}"
loggers:
combined:
handlers: [ console ]
formatter: detail
level: INFO
propagate: False
root:
handlers: [ console ]
level: INFO
我们使用基本格式定义了简单的命令行Logger,也用combined.为开头的名字定义了记录器层次结构的顶层。这些记录器会被用于显示合并后的来自于多个生产者的输出。
下面是日志生产者。
class LogProducer(multiprocessing.Process):
handlerclass= logging.handlers.QueueHandler
def init( self, procid, queue ):
self.procid= procid
self.destination= queue
super()._init()
self.log= logging.getLogger(
"{0}.{1}".format(self.__class.__qualname, self.proc_id) )
self.log.handlers = [ self.handler_class( self.destination ) ]
self.log.setLevel( logging.INFO )
def run( self ):
self.log.info( "Producer {0} Started".format(self.proc_id) )
for i in range(100):
self.log.info( "Producer {:d} Message {:d}".format(self.
proc_id, i) )
self.log.info( "Producer {0} Finished".format(self.proc_id) )
生产者没有做太多配置。它只是简单地用类全名和实例标识符(self.proc _ id)获取一个日志记录器。它将处理程序的列表设置为QueueHandler,它只是对Queue的一个封装实例,这个日志记录器的级别被设定为INFO。
我们将handler _ class定义为类的一个属性,因为我们计划改变它。对于第1个例子而言,它会是logging.handlers.QueueHandler。对于后一个例子而言,我们会使用另外一个类。
真正完成这个工作的进程会用这个日志记录器创建日志信息。这些信息会加入队列中,等待中央消费者处理。在本例中,这个进程简单地用最快的速度清空缓冲区中的102条消息。
下面是我们如何启动消费者和生产者,我们会用一些小步骤展示这个过程。首先,我们创建队列。
import multiprocessing
queue= multiprocessing.Queue(100)
这个队列太小,无法在瞬间清空10个生产者的102条信息。这里用小队列的原因是当消息丢失时可以查看原因,下面是如何启动消费者进程的代码。
consumer = Log_Consumer_1( queue )
consumer.start()
下面是我们如何启动生产者进程数组。
producers = []
for i in range(10):
proc= Log_Producer( i, queue )
proc.start()
producers.append( proc )
和我们预期的一样,10个并发的生产者将会让队列溢出。每个生产者都会收到一些队列慢的异常,这些异常就意味着消息丢失了。
下面是我们如何正确地结束这个处理过程。
for p in producers:
p.join()
queue.put( None )
consumer.join()
首先,我们等待每个生产者进程结束,然后加入父进程。其次,我们将一个哨兵对象插入队列中,这样消费者进程就会正确结束。最后,我们等待消费者进程结束并加入父进程。
14.5.3 防止队列溢出
日志模块默认的行为是用Queue.put _ nowait()方法将信息插入队列中。这么做的优点是允许生产者的运行不受记录日志带来的延迟影响。这样做的缺点是,在最坏情况下,如果队列太小而无法处理大量涌入的日志消息,那么这些消息就会丢失。
我们有两种方法能够合理地处理这种消息大量涌入的情况。
- 我们可以把Queue换成大小没有限制的SimpleQueue.SimpleQueue。由于它的API有一些不同,我们需要扩展QueueHandler,用Queue.put()替换Queue.put _ nowait()。
- 当队列满这种少见的情形发生时,可以减慢生产者的速度,只需要稍微修改QueueHandler,用Queue.put()替代Queue.put _ nowait()。
有趣的是,相同的API修改可以同时适用于Queue和SimpleQueue。下面是修改的代码。
class WaitQueueHandler( logging.handlers.QueueHandler ):
def enqueue(self, record):
self.queue.put( record )
我们用一个不同的Queue的方法替换了enqueue()的方法体。现在,我们可以使用SimpleQueue或者Queue。如果使用Queue,当队列满时它会等待,从而避免了丢失消息。如果使用SimpleQueue,队列的尺寸会稍微增大来保存所有的消息。
下面是修改后的生产者类。
class Log_Producer_2(Log_Producer):
handler_class= WaitQueueHandler
这个类使用了我们的新WaitQueueHandler。这个生产者的其他部分与前面版本相同。
其余的用于创建Queue和启动消费者的脚本保持不变。生产者都是Log Producer 2的实例,但是,用于启动和加入的脚本与第1个例子中使用的代码相同。
这个版本运行得更慢,但是不会丢失任何消息。我们可以通过创建更大的队列容量来提高性能。如果我们创建可以容纳1020条消息的队列,那么本例中的性能就会达到最优。找到一个最优的队列容量需要进行仔细的实验。
