环境:py3.8
原理:模仿wireshark ,利用python的scapy模块下的sniff()函数进行数据的抓取,并进行所谓的“消费者处理”即跟据OSI网络协议模型进行协议分析
。将整个程序精简的概括得到最关键的一句代码:
sniff(prn=lambda pkt: packet_consumer(pkt), stop_filter=lambda pkt: stop_sniff_event.is_set(),
filter=fitler_entry.get(),iface='Intel(R) Dual Band Wireless-AC 7265')
prn=lambda alt : packet_consummer(pkt)这个参数指对抓到的每一个包进行packet_consumer()函数处理即协议分析函数处理。
stop_filter = lambda apk :stop_sniff_event.is_set()这里主要是对sniff()函数的结束控制,如果stop_sniff_event这个我们定义的变量被执行了set()方法,那么我们就停止抓包。
filter = filter_entry.get() 这里是对我们抓的包进行bfp过滤),即过滤只留下我们想要的。后面的filter_entry.get()函数是将界面文本框中用户输入的过滤值拿出来,作为我们的搜索对象。
iface=‘网卡名’,指定网卡这里指物理网卡。
源码如下:
# coding=utf-8
import datetime
import threading
import tkinter
from tkinter import *
from tkinter import font, filedialog
from tkinter.constants import *
from tkinter.filedialog import askopenfilename
from tkinter.messagebox import askyesno
from tkinter.scrolledtext import ScrolledText
from tkinter.ttk import Treeview
from scapy.layers.inet import *
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import *
# 状态栏类
from scapy.sendrecv import sniff
#设一个线程事件变量
from scapy.utils import wrpcap
stop_sniff_event= threading.Event()
pause_sniff_event= threading.Event()
# 捕获总数
sniff_count = 0
# 所有捕获到的报文
sniff_array = []
class StatusBar(Frame):
def __init__(self, master):
Frame.__init__(self, master)
self.label = Label(self, bd=1, relief=SUNKEN, anchor=W)
self.label.pack(fill=X)
def set(self, fmt, *args):
self.label.config(text=fmt % args)
self.label.update_idletasks()
def clear(self):
self.label.config(text="")
self.label.update_idletasks()
# 时间戳转为格式化的时间字符串
def timestamp2time(timestamp):
time_array = time.localtime(timestamp)
mytime = time.strftime("%Y-%m-%d %H:%M:%S", time_array)
return mytime
def on_click_packet_list_tree(event):
"""
数据包列表单击事件响应函数,在数据包列表单击某数据包时,在协议解析区解析此数据包,并在hexdump区显示此数据包的十六进制内容
:param event: TreeView单击事件
:return: None
"""
global sniff_array
selected_item = event.widget.selection() # event.widget获取Treeview对象,调用selection获取选择对象名称
# 清空packet_dissect_tree上现有的内容
packet_dissect_tree.delete(*packet_dissect_tree.get_children())
# 设置协议解析区的宽度
packet_dissect_tree.column('Dissect', width=packet_list_frame.winfo_width())
# !!!!!!!!!!!!!!!测试用的数据包!!!!!!!!!!!!!!要求换成你抓到的数据包!!!!!!!!!!!!!!!!!!!
packet = sniff_array[int(selected_item[0]) - 1]
# 按照协议层次显示数据包
lines = (packet.show(dump=True)).split('\n')
last_tree_entry = None
for line in lines:
if line.startswith('#'):
line = line.strip('# ')
last_tree_entry = packet_dissect_tree.insert('', 'end', text=line)
else:
packet_dissect_tree.insert(last_tree_entry, 'end', t