因为涉及到进程间互斥与通信问题,因此默认情况下Python中的logging无法在多进程环境下打印日志。但是查询了官方文档可以发现,推荐了一种利用logging.SocketHandler的方案来实现多进程日志打印。
其原理很简单,概括一句话就是说:多个进程将各自环境下的日志通过Socket发送给一个专门打印日志的进程,这样就可以防止多进程打印的冲突与混乱情况。
本文主要记录下SocketHandler真实的用法情况:
1 时序图
简单说明下逻辑:主进程(MainProcess)启动一个专门打印日志的进程(LogReceiverProcess),并且将自己(主进程)环境下的日志都“重定向”给LogReceiverProcess。同理,在后续逻辑中启动的所有工作子进程(WorkerProcess)都做一样的操作,把自己环境下的日志都“重定向”给日志进程去打印。
2 实现代码
2.1 日志进程
日志进程的代码核心在于要建立一个TCP Server来接收并处理Log record,代码如下:
1 importos2 importlogging3 importlogging.handlers4 importtraceback5 importcPickle6 importstruct7 importSocketServer8 from multiprocessing importProcess9
10 classLogRecordStreamHandler(SocketServer.StreamRequestHandler):11 defhandle(self):12 whileTrue:13 try:14 chunk = self.connection.recv(4)15 if len(chunk) < 4:16 break
17 slen = struct.unpack(">L", chunk)[0]18 chunk =self.connection.recv(slen)19 while len(chunk) <20 chunk="chunk" self.connection.recv obj="self.unpickle(chunk)22" record="logging.makeLogRecord(obj)23" self.handle_log_record>
25 except:26 break
27
28 @classmethod29 defunpickle(cls, data):30 returncPickle.loads(data)31
32 defhandle_log_record(self, record):33 if self.server.logname is notNone:34 name =self.server.logname35 else:36 name =record.name37 logger =logging.getLogger(name)38 logger.handle(record)39
40
41 classLogRecordSocketReceiver(SocketServer.ThreadingTCPServer):42 allow_reuse_address = 1
43
44 def __init__(self, host='localhost', port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, handler=LogRecordStreamHandler):45 SocketServer.ThreadingTCPServer.__init__(self, (host, port), handler)46 self.abort =047 self.timeout = 1
48 self.logname =None49
50 defserve_until_stopped(self):51 importselect52 abort =053 while notabort:54 rd, wr, ex =select.select([self.socket.fileno()], [], [], self.timeout)55 ifrd:56 self.handle_request()57 abort =self.abort58
59
60 def_log_listener_process(log_format, log_time_format, log_file):61 log_file =os.path.realpath(log_file)62 logging.basicConfig(level=logging.DEBUG, format=log_format, datefmt=log_time_format, filename=log_file, filemode='a+')63
64 #Console log
65 console =logging.StreamHandler()66 console.setLevel(logging.INFO)67 console.setFormatter(logging.Formatter(fmt=log_format, datefmt=log_time_format))68 logging.getLogger().addHandler(console)69
70 tcp_server =LogRecordSocketReceiver()71
72 logging.debug('Log listener process started ...')73 tcp_server.serve_until_stopped()
View Code
关键点:
(1)TCPServer的构建逻辑,拆包还原Log记录;
(2)在日志进程中设定好logging记录级别和打印方式,这里除了指定文件存储还添加了Console打印。
2.2 其他进程
除了日志进程之外的进程,设置logging都“重定向”给日志进程,并且要关闭当前进程的日志在Console打印(默认会显示Warning级别及以上的日志到Console),否则Console上日志展示会有重复凌乱的感觉。
1 classLogHelper:2 #默认日志存储路径(相对于当前文件路径)
3 default_log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'logs')4
5 #记录当前实际的日志所在目录
6 current_log_path = ''
7
8 #记录当前实际的日志完整路径
9 current_log_file = ''
10
11 #日志文件内容格式
12 log_format = '[%(asctime)s.%(msecs)03d][%(processName)s][%(levelname)s][%(filename)s:%(lineno)d] %(message)s'
13
14 #日志中时间格式
15 log_time_format = '%Y%m%d %H:%M:%S'
16
17 #日志进程
18 log_process =None19
20 def __init__(self):21 pass
22
23 @staticmethod24 defprint_console_log(level, message):25 print '--------------------------------------------------'
26 if level ==logging.WARN:27 level_str = '[WARN]'
28 elif level ==logging.ERROR:29 level_str = '[ERROR]'
30 elif level ==logging.FATAL:31 level_str = '[FATAL]'
32 else:33 level_str = '[INFO]'
34 print '\t%s %s' %(level_str, message)35 print '--------------------------------------------------'
36
37 @staticmethod38 def init(clear_logs=True, log_path=''):39 #40 console =logging.StreamHandler()41 console.setLevel(logging.FATAL)42 logging.getLogger().addHandler(console)43
44 try:45 #如果外部没有指定日志存储路径则默认在common同级路径存储
46 if log_path == '':47 log_path =LogHelper.default_log_path48 if notos.path.exists(log_path):49 os.makedirs(log_path)50 LogHelper.current_log_path =log_path51
52 #清理旧的日志并初始化当前日志路径
53 ifclear_logs:54 LogHelper.clear_old_log_files()55 LogHelper.current_log_file =LogHelper._get_latest_log_file()56
57 socket_handler = logging.handlers.SocketHandler('localhost', logging.handlers.DEFAULT_TCP_LOGGING_PORT)58 logging.getLogger().setLevel(logging.DEBUG)59 logging.getLogger().addHandler(socket_handler)60
61 #62 LogHelper.start()63
64 exceptException, ex:65 LogHelper.print_console_log(logging.FATAL, 'init() exception: %s' %str(ex))66 traceback.print_exc()67
68 @staticmethod69 defstart():70 if LogHelper.log_process isNone:71 LogHelper.log_process = Process(target=_log_listener_process, name='LogRecorder', args=(LogHelper.log_format, LogHelper.log_time_format, LogHelper.current_log_file))72 LogHelper.log_process.start()73 else:74 pass
75
76 @staticmethod77 defstop():78 if LogHelper.log_process isNone:79 pass
80 else:81 LogHelper.log_process.terminate()82 LogHelper.log_process.join()83
84 @staticmethod85 def_get_latest_log_file():86 latest_log_file = ''
87 try:88 ifos.path.exists(LogHelper.current_log_path):89 for maindir, subdir, file_name_list inos.walk(LogHelper.current_log_path):90 for file_name infile_name_list:91 apath =os.path.join(maindir, file_name)92 if apath >latest_log_file:93 latest_log_file =apath94
95 if latest_log_file == '':96 latest_log_file = LogHelper.current_log_path + os.sep + 'system_'
97 latest_log_file += time.strftime("%Y%m%d_%H%M%S", time.localtime(time.time())) + '.log'
98
99 exceptException, ex:100 logging.error('EXCEPTION: %s' %str(ex))101 traceback.print_exc()102
103 finally:104 returnlatest_log_file105
106 @staticmethod107 defget_log_file():108 returnLogHelper.current_log_file109
110 @staticmethod111 defclear_old_log_files():112 if notos.path.exists(LogHelper.current_log_path):113 logging.warning('clear_old_log_files() Not exist: %s' %LogHelper.current_log_path)114 return
115
116 try:117 for maindir, subdir, file_name_list inos.walk(LogHelper.current_log_path):118 for file_name infile_name_list:119 apath =os.path.join(maindir, file_name)120 if apath !=LogHelper.current_log_file:121 logging.info('DEL -> %s' %str(apath))122 os.remove(apath)123 else:124 with open(LogHelper.current_log_file, 'w') as f:125 f.write('')126
127 logging.debug('Clear log done.')128
129 exceptException, ex:130 logging.error('EXCEPTION: %s' %str(ex))131 traceback.print_exc()
View Code
20>