读取事件日志回报PyEventLogRecords(包装[MS.Docs]:_EVENTLOGRECORD 结构), 尽管事件渲染期望(你需要合作)PyHANDLEs (PyEVT_HANDLEs(包装EVT_HANDLE ([MS.Docs]:Windows 事件日志数据类型)更准确地说))。
所以,为了得到XML数据,您需要使用适用于此类型的函数系列:例如EvtQuery, EvtNext.
code.py:
#!/usr/bin/env python3
import sys
import pywintypes
import win32evtlog
INFINITE = 0xFFFFFFFF
EVTLOG_READ_BUF_LEN_MAX = 0x7FFFF
def get_record_data(eventlog_record):
ret = dict()
for key in dir(eventlog_record):
if 'A' < key[0] < 'Z':
ret[key] = getattr(eventlog_record, key)
return ret
def get_eventlogs(source_name="Application", buf_size=EVTLOG_READ_BUF_LEN_MAX, backwards=True):
ret = list()
evt_log = win32evtlog.OpenEventLog(None, source_name)
read_flags = win32evtlog.EVENTLOG_SEQUENTIAL_READ
if backwards:
read_flags |= win32evtlog.EVENTLOG_BACKWARDS_READ
else:
read_flags |= win32evtlog.EVENTLOG_FORWARDS_READ
offset = 0
eventlog_records = win32evtlog.ReadEventLog(evt_log, read_flags, offset, buf_size)
while eventlog_records:
ret.extend(eventlog_records)
offset += len(eventlog_records)
eventlog_records = win32evtlog.ReadEventLog(evt_log, read_flags, offset, buf_size)
win32evtlog.CloseEventLog(evt_log)
return ret
def get_events_xmls(channel_name="Application", events_batch_num=100, backwards=True):
ret = list()
flags = win32evtlog.EvtQueryChannelPath
if backwards:
flags |= win32evtlog.EvtQueryReverseDirection
try:
query_results = win32evtlog.EvtQuery(channel_name, flags, None, None)
except pywintypes.error as e:
print(e)
return ret
events = win32evtlog.EvtNext(query_results, events_batch_num, INFINITE, 0)
while events:
for event in events:
ret.append(win32evtlog.EvtRender(event, win32evtlog.EvtRenderEventXml))
events = win32evtlog.EvtNext(query_results, events_batch_num, INFINITE, 0)
return ret
def main():
import sys, os
from collections import OrderedDict
standard_log_names = ["Application", "System", "Security"]
source_channel_dict = OrderedDict()
for item in standard_log_names:
source_channel_dict[item] = item
for item in ["Windows Powershell"]: # !!! This works on my machine (96 events)
source_channel_dict[item] = item
for source, channel in source_channel_dict.items():
print(source, channel)
logs = get_eventlogs(source_name=source)
xmls = get_events_xmls(channel_name=channel)
#print("\n", get_record_data(logs[0]))
#print(xmls[0])
#print("\n", get_record_data(logs[-1]))
#print(xmls[-1])
print(len(logs))
print(len(xmls))
if __name__ == "__main__":
print("Python {:s} on {:s}\n".format(sys.version, sys.platform))
main()
Notes:
- The 2 lists should have the same length. The nth entry in each of them should reference the same event (as long as both functions are called with same value for backwards argument (read below))
-
get_events_xmls:
- 返回一个列表XML与事件相关的 blob
- 错误处理不是最好的,你可以包装所有API来电
try
/ except
子句(我没有遇到错误,所以我不确定什么情况下会引发异常)
- 你可以玩一点[MS.Docs]:EvtNext 函数的参数 (Timeout and 活动规模用于性能微调;为我,~20k事件的处理方式为<10秒 - 其中文本打印和转换花费最多)
- In Python 3, the XMLs are bytes ([Python 3.Docs]:内置类型 -class bytes([源[,编码[,错误]]]))而不是普通字符串(我必须对它们进行编码,因为有些包含一些非ASCII字符,并尝试打印它们会引发Unicode编码错误)
- 事件过滤是可能的,检查[MS.Docs]:EvtQuery 函数的参数(Flags and Query)
- 请注意向后允许以相反(时间顺序)顺序遍历事件的参数(默认设置为True).
-
get_record_data:
- 这只是一个便利函数,它转换PyEventLogRecord对象变成Python字典
- 转换基于以下事实:我们关心的字段以大写字母开头(EventID