python3+pyqt5实践历程(一)——基于socket通信与pyqt5的串口工具

2023-10-26

python3+pyqt5实践历程(一)——基于socket通信与pyqt5的串口工具


系列文章目录

python3+tkinter实践历程(一)——基于requests与tkinter的API工具
python3+tkinter实践历程(二)——基于tkinter的日志检索工具
python3+tkinter实践历程(三)——基于requests与tkinter的模拟web登录工具
python3+tkinter实践历程(四)——模仿CRT完成基于socket通信与tkinter的TCP串口客户端

制作背景

① 在python3+tkinter实践历程(四)——模仿CRT完成基于socket通信与tkinter的TCP串口客户端中所实现串口工具,经过测试发现了巨大的缺陷,tkinter的Text控件在不断写入文本的时候也会不断增加工具占用的内存,导致会在windows内存占用满时,工具崩溃。
②由于没有找到tkinter框架下解决这个问题的办法,经过试验PYQT5的QTextBrowser控件不会产生此问题,于是用pyqt5框架重写整个工具,最终实现的功能与tkinter框架实现的完全一致。

最终功能

① 提供界面输入-TCP服务器的IP、端口
② 提供界面选项-保存日志的目录,成功连接TCP服务器后,自动将所有接收到的日志保存,按日期划分文件夹,按串口划分log文件。
③ 提供界面打印界面,自主选择串口打印,且与保存日志功能完全分离,互不干扰。
④ 打印日志的界面支持增、删、重命名。
⑤ 工具支持读取界面配置、记录历史配置
⑥ 支持给不同串口发送命令,发送命令框支持回车发送+按钮发送
⑦ 快速发送按钮支持读取配置
⑧ 提供选项-滚动条是否自动跟随新打印的日志,支持实时改变
⑨ 提供了自动检查关键字、定时推送告警信息至企业微信的功能

工具截图展示

在这里插入图片描述

代码详解

"""
串口工具
"""
from PyQt5.QtWidgets import QApplication, QWidget, QPushButton, QTextBrowser, QVBoxLayout, QHBoxLayout, QLabel,\
    QLineEdit, QRadioButton, QTabWidget, QComboBox, QTextEdit, QFileDialog, QDesktopWidget, QAction, QMainWindow
from PyQt5.QtCore import Qt, pyqtSignal, QThread
from functools import partial
import threading
import os
import datetime
import socket
import re
import time
import json
import sys
import logging.handlers
import subprocess
import requests


def ipping(ip, expect=''):
    """ping IP"""
    try:
        order = 'ping %s' % ip
        logger.info('执行命令:%s' % order)
        ftp_ret = subprocess.Popen(order, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        ret = ftp_ret.stdout.read()
        str_ret = ret.decode("gbk")
        logger.info(str_ret)
        if expect == 'false':
            target1 = '请求超时'
            target2 = '无法访问目标主机'
            ret_s = re.search(target1, str_ret)
            ret_s2 = re.search(target2, str_ret)
            if ret_s or ret_s2:
                logger.info(ret_s, ret_s2)
                logger.info('设备ip:%s,不能ping通' % ip)
                return False
            else:
                logger.info('设备ip:%s,能ping通' % ip)
                return True
        else:
            target = 'TTL'
            ret_s = re.search(target, str_ret)
            if ret_s:
                logger.info(ret_s)
                logger.info('设备ip:%s,能ping通' % ip)
                return True
            else:
                logger.info('设备ip:%s,不能ping通' % ip)
                return False
    except Exception as e:
        logger.info('ping:%s异常,%s' % (ip, e))
        return False


class SocketServer(object):
    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.connect_state = False
        self.connect_tcp_server()
        self.incomplete_data = {'serial': '', 'log_info': ''}
        self.Exception = 0

    def connect_tcp_server(self):
        """连接服务器"""
        try:
            self.connect_state = False
            ping_result = ipping(self.ip, expect='True')
            if ping_result:
                logger.info('开始连接串口服务器%s:%d' % (self.ip, self.port))
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.connect((self.ip, self.port))
                logger.info('连接完成')
                self.connect_state = True
            else:
                logger.info('%s串口服务器无法ping通,不进行连接' % self.ip)
        except Exception as e:
            logger.info('连接异常:%s' % e)
            self.connect_state = False

    def get_serial_data(self):
        """接收服务器数据"""
        try:
            recv_info = self.sock.recv(2048).decode("utf-8", "replace")       # 服务器传过来的是bytes类型,收到数据后解码变成str
            self.Exception = 0
            # logger.info(recv_info)
        except Exception as e:
            logger.info('获取数据异常:%s' % e)
            self.Exception = self.Exception + 1
            return False
        info_list = self.analysis_data(recv_info)
        if not info_list:
            return ''
        else:
            return info_list

    def analysis_data(self, recv_info):
        """解析串口日志"""
        if recv_info:
           # 根据与服务器商量好的规则,去解析出日志
           return [{'serial': '', 'log_info': ''}, {'serial': '', 'log_info': ''}, {'serial': '', 'log_info': ''}, {'serial': '', 'log_info': ''}]
            except Exception as e:
                logger.info('解析数据异常:%s,异常数据:%s' % (e, recv_info))
                return []

    def send_data(self, data):
        """发送数据"""
        try:
            self.sock.send(data)
            return True
        except Exception as e:
            logger.info('发送命令异常:%s' % e)
            return False

    def close_con(self):
        """断开socket连接"""
        try:
            logger.info('尝试断开TCP连接')
            self.sock.close()
            logger.info('TCP连接已断开')
            return True
        except Exception as e:
            logger.info('断开TCP连接异常:%s' % e)
            return False


class TextEdit(QTextEdit):
    """继承QTextEdit,重写keyPressEventh函数,通过信号槽将[回车]传出去"""
    signal_enter = pyqtSignal(str)
    def __init__(self):
        QTextEdit.__init__(self)


    def keyPressEvent(self, QKeyEvent):
        if QKeyEvent.key() == Qt.Key_Return or QKeyEvent.key() == Qt.Key_Enter:
            self.signal_enter.emit('enter')
        else:
            super().keyPressEvent(QKeyEvent)


class Tool(QMainWindow):
    """工具类"""
    def __init__(self):
        QMainWindow.__init__(self)
        self.resize(1180, 715)
        self.setFixedSize(self.width(), self.height())
        self.save_log_dir = ''
        self.is_running = ''
        self.see_log = True         # 滚动条是否跟随日志
        self.button_column = 0      # 用于记录button的最大值
        self.top = ''               # 重命名窗口/keyword窗口/推送机器人地址窗口

        self.serial_value = ['serial01', 'serial02', 'serial03', 'serial04', 'serial05', 'serial06',
                             'serial07', 'serial08', 'serial09', 'serial10', 'serial11', 'serial12',
                             'serial13', 'serial14', 'serial15', 'serial16', 'serial17', 'serial18',
                             'serial19', 'serial20', 'serial21', 'serial22', 'serial23', 'serial24']    # 串口列表

        # 告警关键字
        self.keyword_list = self.read_keyword_txt()
        logger.info('获取到的告警关键字为:' + json.dumps(self.keyword_list, ensure_ascii=False))
        # 机器人推送地址
        self.robot_addr = self.read_robot_txt()
        self.is_push_alarm_log = True

        self.tab = {}       # TAB集合
        self.active_tab = ''    # 当前置顶
        self.create_widgets()

        self.conn_thread = Conn_Thread()
        self.conn_thread.updated_log.connect(self.write_log_to_Text)
        self.conn_thread.check_log.connect(self.start_check_log_keyword)
        self.send_thread = Send_data_Thread()
        self.send_thread.data_input_signal.connect(self.append_data_input)

    def create_widgets(self):
        """创建图形化界面"""
        try:
            # 菜单栏
            bar = self.menuBar()
            main_menu = bar.addMenu("菜单栏")
            keyword_menu = QAction("告警关键字", self)
            robot_menu = QAction("机器人地址", self)
            main_menu.addAction(keyword_menu)
            main_menu.addAction(robot_menu)
            keyword_menu.triggered.connect(lambda: self.creat_top_tab('告警关键字'))
            robot_menu.triggered.connect(lambda: self.creat_top_tab('机器人地址'))

            # 第一行
            self.row1 = QHBoxLayout()

            ip_port_and_log_addr = self.work_for_ip_and_log(operate='get')
            if not ip_port_and_log_addr:
                ip_port = "xx.xx.xx.xx:9036"
                log_addr = ""
            else:
                if not ip_port_and_log_addr[0]:
                    ip_port = "xx.xx.xx.xx:9036"
                else:
                    ip_port = ip_port_and_log_addr[0]
                if not ip_port_and_log_addr[1]:
                    log_addr = ""
                else:
                    log_addr = ip_port_and_log_addr[1]
                    self.save_log_dir = log_addr

            # IP地址
            server_url = QLabel('串口服务器地址(IP:端口)')
            self.server_url_text = QLineEdit(ip_port)
            self.server_url_text.setFixedWidth(125)

            # 日志存放目录
            log_dir_label = QLabel('日志存放地址')
            self.log_dir_text = QLineEdit()
            self.log_dir_text.setFixedWidth(180)
            self.log_dir_text.setText(log_addr)
            self.log_dir_btn = QPushButton('选择日志文件夹')
            self.log_dir_btn.clicked.connect(self.selectPath)
            # 连接串口服务器
            start_btn = QPushButton('连接串口服务器')
            start_btn.clicked.connect(self.start_socket_conn)
            stop_btn = QPushButton('停止连接')
            stop_btn.clicked.connect(self.start_stop_conn)

            # 滚动条跟随日志复选框
            self.see_log_btn = QRadioButton('滚动条跟随日志')
            self.see_log_btn.toggled.connect(self.change_see_log)
            self.see_log_btn.setChecked(True)
            # 增加标签、删除标签、重命名
            add_tab = QPushButton('增加标签页')
            add_tab.clicked.connect(self.add_log_text)
            del_tab = QPushButton('删除标签页')
            del_tab.clicked.connect(self.del_log_text)
            rename_tab = QPushButton('重命名')
            rename_tab.clicked.connect(lambda: self.creat_top_tab('重命名'))

            self.row1.addWidget(server_url, 0, Qt.AlignLeft)
            self.row1.addWidget(self.server_url_text, 0, Qt.AlignLeft)
            self.row1.addWidget(log_dir_label, 0, Qt.AlignLeft)
            self.row1.addWidget(self.log_dir_text, 0, Qt.AlignLeft)
            self.row1.addWidget(self.log_dir_btn, 0, Qt.AlignLeft)
            self.row1.addWidget(start_btn, 0, Qt.AlignLeft)
            self.row1.addWidget(stop_btn, 0, Qt.AlignLeft)
            self.row1.addWidget(self.see_log_btn, 0, Qt.AlignLeft)
            self.row1.addWidget(add_tab, 0, Qt.AlignLeft)
            self.row1.addWidget(del_tab, 0, Qt.AlignLeft)
            self.row1.addWidget(rename_tab, 0, Qt.AlignLeft)

            # 第二行 选项卡
            self.row2 = QHBoxLayout()
            self.tabwidget = QTabWidget()
            self.tabwidget.currentChanged.connect(self.get_now_active_tab)
            self.row2.addWidget(self.tabwidget)
            # 读取标签页记录的内容
            interface_data = self.work_for_data('tab', 'read')
            if not interface_data:
                interface_data = {'log1': {'name_str': 'log1', 'serial_value': ''}}
            logger.info(interface_data)
            for name in interface_data:
                # 日志打印框
                self.tab[name] = {}
                self.tab[name]['name_str'] = interface_data[name]['name_str']
                self.tab[name]['tab'] = QWidget()
                self.tabwidget.addTab(self.tab[name]['tab'], interface_data[name]['name_str'])
                layout = QHBoxLayout()
                self.tab[name]['browser'] = QTextBrowser()
                self.tab[name]['browser'].setFixedSize(1000, 520)

                self.tab[name]['browser'].document().setMaximumBlockCount(3000)
                # 串口下拉框
                self.tab[name]['serial_value'] = interface_data[name]['serial_value']
                self.tab[name]['serial'] = QComboBox()
                self.tab[name]['serial'].setFixedWidth(90)
                self.tab[name]['serial'].addItems(self.serial_value)
                self.tab[name]['serial'].activated[str].connect(self.start_change_serial)
                if interface_data[name]['serial_value']:
                    self.tab[name]['serial'].setCurrentIndex(self.serial_value.index(interface_data[name]['serial_value']))
                else:
                    self.tab[name]['serial'].setCurrentIndex(-1)
                layout.addWidget(self.tab[name]['browser'], 1, Qt.AlignLeft | Qt.AlignTop)
                layout.addWidget(self.tab[name]['serial'], 1, Qt.AlignLeft | Qt.AlignTop)
                self.tab[name]['tab'].setLayout(layout)

            self.active_tab = list(interface_data.keys())[0]

            # 第三行
            self.row3 = QHBoxLayout()
            # 命令发送框
            self.data_input = TextEdit()
            self.data_input.setFixedSize(500, 50)
            self.data_input.signal_enter.connect(self.start_send_data)
            # 发送按钮
            send_btn = QPushButton('发送')
            send_btn.setFixedWidth(70)
            send_btn.clicked.connect(self.start_send_data)
            self.row3.addWidget(self.data_input)
            self.row3.addWidget(send_btn, 0, Qt.AlignLeft)

            # 第四行
            self.row4 = QHBoxLayout()
            quick_send_data = self.work_for_data('quick_send')
            if quick_send_data:
                logger.info(quick_send_data)
                self.quick_dict = {}
                for data in quick_send_data:
                    self.quick_dict[data['name']] = QPushButton(data['name'])
                    self.quick_dict[data['name']].setFixedWidth(70)
                    self.quick_dict[data['name']].clicked.connect(partial(self.start_quick_send, data))
                    self.row4.addWidget(self.quick_dict[data['name']], 0, Qt.AlignLeft)
            self.row4.addStretch()

            all_row = QVBoxLayout()
            all_row.addLayout(self.row1)
            all_row.addLayout(self.row2)
            all_row.addLayout(self.row3)
            all_row.addLayout(self.row4)
            # 如果是继承于QMainWindow的主窗口,不能直接使用setLayout方法,就需要下面3行代码
            widget = QWidget()
            widget.setLayout(all_row)
            self.setCentralWidget(widget)
        except Exception as e:
            logger.info('创建图形化界面异常:%s' % e)
            print('创建图形化界面异常:%s' % e)

    def selectPath(self):
        """把获取到的串口目录传入Entry"""
        try:
            m = QFileDialog.getExistingDirectory()
            if m:
                self.log_dir_text.setText(m)
                self.save_log_dir = m
                logger.info(self.save_log_dir)
        except Exception as e:
            logger.info('获取串口目录异常:%s' % e)

    def work_for_data(self, key, work=''):
        """存/取tab字典,取快捷输入数据"""
        try:
            if key == 'tab':
                if work == 'write':
                    save_tab = dict()
                    for i in self.tab:
                        save_tab[i] = {}
                        save_tab[i]['name_str'] = self.tab[i]['name_str']
                        save_tab[i]['serial_value'] = self.tab[i]['serial_value']
                    with open('interface_data.txt', 'w', encoding='utf-8') as f:
                        data = json.dumps(save_tab)
                        f.write(data)
                        return True
                if work == 'read':
                    if os.path.exists('interface_data.txt'):
                        with open('interface_data.txt', 'r', encoding='utf-8') as f:
                            data = f.read()
                            data = json.loads(data)
                            logger.info('读取到的tab:%s' % data)
                            return data
                    else:
                        return ''
            if key == 'quick_send':
                data_list = []
                if os.path.exists('quick_send_data.txt'):
                    with open('quick_send_data.txt', 'r', encoding='utf-8') as f:
                        for i in f:
                            if i[0] != '#':
                                if '*start-' in i:
                                    info = {}
                                    l = i.replace('*start-', '')
                                    name = l[0:l.rfind('=')]
                                    l = l[l.rfind('=') + 1:]
                                    if '\send' in l:
                                        send = True
                                        l = l.replace('\send', '')
                                    else:
                                        send = False
                                    if '\n' in l:
                                        l = l.replace('\n', '')
                                    l = l.split(';')
                                    info['data'] = l
                                    info['name'] = name
                                    info['send'] = send
                                    data_list.append(info)
                    return data_list
                else:
                    return False
        except Exception as e:
            logger.info('对%s进行%s操作异常:%s' % (key, work, e))
            return False

    def work_for_ip_and_log(self, operate=''):
        """获取记录的IP端口信息和日志保存地址信息"""
        try:
            if operate == 'get':
                if not os.path.exists('ip_info_and_log_addr.txt'):
                    logger.info('ip_info_and_log_addr.txt不存在')
                    return ''
                with open('ip_info_and_log_addr.txt', 'r', encoding='utf-8') as f:
                    info = f.read()
                    if info:
                        if '\n' in info:
                            info = info.replace('\n', '')
                        info = info.split(';')
                return info
            if operate == 'set':
                ip_port = self.server_url_text.text()
                log_addr = self.log_dir_text.text()
                with open('ip_info_and_log_addr.txt', 'w', encoding='utf-8') as f:
                    info = ip_port + ';' + log_addr
                    f.write(info)
        except Exception as e:
            logger.info('%s操作ip_info_and_log_addr.txt异常:%s' % (operate, e))
            return ''

    def add_log_text(self):
        """增加多个tab页"""
        try:
            now_log_text_num = 0
            for ii in range(1, 1000):
                if 'log' + str(ii) not in self.tab:
                    now_log_text_num = ii
                    break
            name = 'log' + str(now_log_text_num)
            self.tab[name] = dict()
            self.tab[name]['tab'] = QWidget()
            self.tabwidget.addTab(self.tab[name]['tab'], name)
            self.tab[name]['name_str'] = name
            layout = QHBoxLayout()
            self.tab[name]['browser'] = QTextBrowser()
            self.tab[name]['browser'].setFixedSize(1000, 520)
            self.tab[name]['browser'].document().setMaximumBlockCount(3000)
            # 串口下拉框
            self.tab[name]['serial_value'] = ''
            self.tab[name]['serial'] = QComboBox()
            self.tab[name]['serial'].setFixedWidth(90)
            self.tab[name]['serial'].addItems(self.serial_value)
            self.tab[name]['serial'].activated[str].connect(self.start_change_serial)
            self.tab[name]['serial'].setCurrentIndex(-1)
            layout.addWidget(self.tab[name]['browser'], 1, Qt.AlignLeft | Qt.AlignTop)
            layout.addWidget(self.tab[name]['serial'], 1, Qt.AlignLeft | Qt.AlignTop)
            self.tab[name]['tab'].setLayout(layout)
            self.tabwidget.setCurrentWidget(self.tab[name]['tab'])
            self.active_tab = name

            # 存放tab信息
            result = self.work_for_data('tab', 'write')
            if result:
                logger.info('存放tab信息与interface_data.txt完成')
        except Exception as e:
            logger.info('增加TAB页异常:%s' % e)

    def del_log_text(self):
        """删除当前置顶的tab页"""
        try:
            if len(self.tab) == 1:
                logger.info('只有一个tab页,不可销毁')
                return
            self.get_now_active_tab()
            target_tab = self.active_tab
            self.tabwidget.removeTab(self.tabwidget.currentIndex())
            del self.tab[target_tab]
            self.get_now_active_tab()
            # 存放tab信息
            result = self.work_for_data('tab', 'write')
            if result:
                logger.info('存放tab信息与interface_data.txt完成')
        except Exception as e:
            logger.info('删除TAB页异常:%s' % e)

    def creat_top_tab(self, top_type='重命名'):
        """创建重命名的窗口"""
        try:
            if self.top:
                self.top.close()
                self.top = ''
            if top_type == '重命名':
                self.top = rename_widget('重命名')
                self.top.signal.connect(self.tab_rename)
                self.top.create_widgets(self.tab[self.active_tab]['name_str'])
            elif top_type == '告警关键字':
                self.top = keyword_widget('告警关键字')
                self.top.signal.connect(self.change_keyword_list)
                self.top.create_widgets(self.keyword_list)
            elif top_type == '机器人地址':
                self.top = robot_widget('机器人地址')
                self.top.signal.connect(self.change_robot_addr)
                self.top.create_widgets(self.robot_addr)
            else:
                return
            self.top.show()
        except Exception as e:
            logger.info('创建重命名窗口异常:%s' % e)

    def tab_rename(self, new_name):
        """重命名的操作"""
        try:
            self.top.close()
            if new_name:
                self.get_now_active_tab()
                for i in self.tab:
                    if self.tab[i]['name_str'] == new_name:
                        logger.info('%s名字有重名,不能设置' % new_name)
                        return
                self.tab[self.active_tab]['name_str'] = new_name
                self.tabwidget.setTabText(self.tabwidget.currentIndex(), new_name)
                result = self.work_for_data('tab', 'write')
                if result:
                    logger.info('存放tab信息与interface_data.txt完成')
            else:
                logger.info('重命名窗口返回为空,不设置')
        except Exception as e:
            logger.info('TAB重命名操作异常:%s' % e)

    def change_keyword_list(self, keyword):
        """改变告警关键字的操作"""
        try:
            self.top.close()

            self.keyword_list = []
            if keyword:
                keyword = keyword.split('\n')
                for i in keyword:
                    if i:
                        self.keyword_list.append(i)

            self.write_keyword_txt()
            logger.info('告警关键字更新为:' + json.dumps(self.keyword_list, ensure_ascii=False))
        except Exception as e:
            logger.info('变更检查关键字异常:%s' % e)

    def change_robot_addr(self, robot_addr):
        """改变机器人推送地址的操作"""
        try:
            self.top.close()
            if '\n' in robot_addr:
                robot_addr = robot_addr.replace('\n', '')
            self.robot_addr = robot_addr
            self.write_robot_txt()
            logger.info('机器人推送地址更新为:' + json.dumps(self.robot_addr, ensure_ascii=False))
        except Exception as e:
            logger.info('机器人推送地址变更异常:%s' % e)

    def get_now_active_tab(self):
        """获取当前置顶的TAB页"""
        try:
            active_name = self.tabwidget.tabText(self.tabwidget.currentIndex())
            for i in self.tab:
                if self.tab[i]['name_str'] == active_name:
                    self.active_tab = i
                    logger.info('当前tab为:%s' % self.active_tab)
                    break
        except Exception as e:
            logger.info('获取当前置顶的TAB页异常:%s' % e)

    def change_serial(self):
        """更改输出日志的串口"""
        # 存放tab信息
        try:
            result = self.work_for_data('tab', 'write')
            if result:
                logger.info('存放tab信息与interface_data.txt完成')
        except Exception as e:
            logger.info('更改输出日志的串口异常:%s' % e)

    def change_see_log(self):
        """改变滚动条是否跟随日志"""
        try:
            if self.see_log_btn.isChecked() == True:
                self.see_log = True
                logger.info('切换成跟随日志')
            else:
                self.see_log = False
                logger.info('切换成不跟随日志')
        except Exception as e:
            logger.info('切换跟随日志状态异常:%s' % e)

    def stop_recv_data(self):
        """停止任务"""
        try:
            if self.is_running is False:
                logger.info('stopped')
            else:
                self.is_running = False
                logger.info('stopping')
                try:
                    if sock_conn.close_con():
                        logger.info('stopped')
                except:
                    logger.info('stopped')
        except Exception as e:
            logger.info('结束socket连接异常:%s' % e)

    def send_data_to_server(self, data=''):
        """发送数据给服务器"""
        try:
            if self.is_running:
                self.get_now_active_tab()
                if not self.tab[self.active_tab]['serial_value']:
                    logger.info('无指定串口')
                    return
                if data:
                    if data[-1] not in ['\n', '\r']:
                        logger.info('给数据加回车')
                        data = data + '\n'
                else:
                    data = data + '\n'
                logger.info('给%s发送%s' % (self.tab[self.active_tab]['serial_value'], data))
                data = '*<[%s]>*' % self.tab[self.active_tab]['serial_value'][-2:] + data
                send_result = sock_conn.send_data(bytes(data, encoding='utf8'))
                if not send_result:
                    logger.info('发送数据失败')
                else:
                    logger.info('数据发送成功')
            else:
                logger.info('发送数据失败,未开始任务')
        except Exception as e:
            logger.info('发送数据异常:%s' % e)

    def save_log(self, log_info, serial_port):
        """把收到的日志分别传入不同的log文本中"""
        try:
            if not self.save_log_dir:
                # logger.info('无传入日志目录')
                return
            now_day = datetime.datetime.now().strftime('%Y-%m-%d')
            now_time = datetime.datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
            tab_name = self.find_tab_name(serial_port)
            log_folder = os.path.join(self.save_log_dir, now_day)
            if not os.path.exists(log_folder):
                os.mkdir(log_folder)
            if tab_name:
                log_file = os.path.join(log_folder, '%s_%s_%s.log' % (serial_port, now_day, tab_name))
            else:
                log_file = os.path.join(log_folder, '%s_%s.log' % (serial_port, now_day))
            if '\n' not in log_info[-1] and '\r' not in log_info[-1]:
                log_info = log_info + '\n'
            log_info = now_time + log_info
            with open(log_file, 'a', errors='ignore', newline='', encoding='utf-8') as f:
                f.write(log_info)
        except Exception as e:
            logger.info('保存日志异常:%s' % e)

    def find_tab_name(self, serial_value):
        """通过串口编号寻找TAB名"""
        try:
            target = ''
            for i in self.tab:
                if self.tab[i]['serial_value'] == serial_value:
                    target = self.tab[i]['name_str']
                    return target
            return target
        except Exception as e:
            logger.info('通过串口编号寻找TAB名异常:%s' % e)

    def re_connect(self, times='infinite'):
        """尝试重连"""
        try:
            if times == 'infinite':
                while True:
                    if self.is_running:
                        sock_conn.close_con()
                        sock_conn.connect_tcp_server()
                        for i in range(60):
                            if sock_conn.connect_state:
                                logger.info('重连成功')
                                return True
                            else:
                                time.sleep(5)
                        logger.info('重连失败')
                    else:
                        return False
            else:
                sock_conn.close_con()
                sock_conn.connect_tcp_server()
                if sock_conn.connect_state:
                    logger.info('重连成功')
                else:
                    logger.info('重连失败')
        except Exception as e:
            logger.info('尝试重连异常:%s' % e)

    def write_log_to_Text(self, interface, log_msg):
        """日志动态打印"""
        try:
            if log_msg[-1] not in ['\n', '\r']:
                log_msg = log_msg + '\n'
            logmsg_in = "%s" % log_msg
            self.tab[interface]['browser'].append(logmsg_in)
            if self.active_tab == interface and self.see_log:
                cursot = self.tab[interface]['browser'].textCursor()
                self.tab[interface]['browser'].moveCursor(cursot.End)
        except Exception as e:
            logger.info('日志打印异常:%s' % e)
            logger.info(log_msg)

    def append_data_input(self, data):
        """命令输入框显示出命令"""
        try:
            self.data_input.append(data)
        except Exception as e:
            logger.info('命令输入框输出命令[%s]异常:%s' % (data, e))

    def read_keyword_txt(self, txt_addr=''):
        """读取keyword.txt"""
        try:
            if not txt_addr:
                txt_addr = 'keyword.txt'
            if os.path.exists(txt_addr):
                with open('keyword.txt', 'r', encoding='utf-8') as f:
                    keyword = f.readlines()
                if keyword:
                    keyword_list = []
                    for i in keyword:
                        if i:
                            if '\n' in i or '\r' in i:
                                i = i.rstrip('\n')
                                i = i.rstrip('\r')
                            if i:
                                keyword_list.append(i)
                    return keyword_list
            return []
        except Exception as e:
            logger.info('获取告警关键字异常:%s' % e)
            return []

    def write_keyword_txt(self, txt_addr=''):
        """更新keyword.txt"""
        try:
            if not txt_addr:
                txt_addr = 'keyword.txt'
            with open(txt_addr, 'w', encoding='utf-8') as f:
                f.write('\n'.join(self.keyword_list))
        except Exception as e:
            logger.info('更新告警关键字txt异常:%s' % e)

    def read_robot_txt(self):
        """读取robot.txt"""
        try:
            txt_addr = 'robot.txt'
            if os.path.exists(txt_addr):
                with open(txt_addr, 'r', encoding='utf-8') as f:
                    robot_addr = f.read()
                    if '\n' in robot_addr:
                        robot_addr = robot_addr.replace('\n', '')
                    return robot_addr
            return ''
        except Exception as e:
            logger.info('获取机器人推送地址异常:%s' % e)
            return ''

    def write_robot_txt(self):
        """更新robot.txt"""
        try:
            txt_addr = 'robot.txt'
            with open(txt_addr, 'w', encoding='utf-8') as f:
                f.write(self.robot_addr)
        except Exception as e:
            logger.info('更新机器人推送地址txt异常:%s' % e)

    def check_log_keyword(self, log_info, serial_port):
        """收到的日志搜索告警关键字"""
        try:
            now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            for i in self.keyword_list:
                if i in log_info:
                    tab_name = self.find_tab_name(serial_port)
                    data = '%s:%s串口,别名:%s,出现告警关键字:%s' % (now, serial_port, tab_name, i)
                    self.write_alarm_log(data)
        except Exception as e:
            logger.info('%s-%s搜索关键字异常:%s' % (serial_port, log_info, e))

    def write_alarm_log(self, data):
        """记录告警信息"""
        try:
            alarm_dir = os.path.join(path2, "自动化巡检告警信息")
            if not os.path.exists(alarm_dir):
                os.mkdir(alarm_dir)
            now_day = datetime.datetime.now().strftime("%Y-%m-%d")
            alarm_file = os.path.join(alarm_dir, '%s.txt' % now_day)
            if data:
                with open(alarm_file, 'a+', encoding='utf-8') as f:
                    f.write(data)
                    f.write('\n')
        except Exception as e:
            logger.info('将%s信息写入告警日志异常:%s' % (json.dumps(data, ensure_ascii=False), e))

    def set_push_robot_timer(self, inc):
        """推送告警信息"""
        try:
            if self.is_push_alarm_log:
                alarm_dir = os.path.join(path2, "自动化巡检告警信息")
                now_day = datetime.datetime.now().strftime("%Y-%m-%d")
                alarm_file = os.path.join(alarm_dir, '%s.txt' % now_day)
                if os.path.exists(alarm_file):
                    with open(alarm_file, 'r', encoding='utf-8') as f:
                        alarm_info = f.read()
                        self.push_robot_massage(alarm_info)
            t = threading.Timer(inc, self.set_push_robot_timer, (inc,))
            t.start()
        except Exception as e:
            logger.info('推送告警信息异常:%s' % e)

    def push_robot_massage(self, data):
        """推送信息到企业微信"""
        try:
            if data:
                data = '串口服务器[%s]推送告警信息\n' % title + data
            body = {"msgtype": "text", "text": {"content": data}}
            headers = {"Content-Type": "application/json"}
            requests.post(self.robot_addr, headers=headers, json=body)
            logger.info('机器人[%s]消息发送完毕' % self.robot_addr)
        except Exception as e:
            logger.info('企业微信推送报告异常:%s' % e)
            return False

    def thread_it(self, func, *args):
        """将函数打包进线程"""
        # 创建
        t = threading.Thread(target=func, args=args)
        # 守护 !!!
        t.setDaemon(True)
        # 启动
        t.start()

    def start_socket_conn(self, *args):
        """运行连接串口服务器的线程"""
        self.work_for_ip_and_log(operate='set')
        if not self.is_running:
            self.conn_thread.start()
        else:
            logger.info('self.is_running为True,直接返回')

    def start_stop_conn(self, *args):
        """创建停止任务的线程"""
        self.thread_it(self.stop_recv_data)

    def start_send_data(self):
        """创建发送命令的线程"""
        try:
            data = self.data_input.toPlainText()
            logger.info(data)
            if not data:
                logger.info('无数据')
            self.data_input.clear()
            self.thread_it(self.send_data_to_server, data)
        except Exception as e:
            logger.info('开始发送数据前置操作异常:%s' % e)

    def start_quick_send(self, data):
        """运行点击按钮发送命令的线程"""
        try:
            if self.is_running:
                self.send_thread.data = data
                self.send_thread.start()
            else:
                logger.info('self.is_running为False,直接返回')
        except Exception as e:
            logger.info('快速发送数据操作异常:%s' % e)

    def start_change_serial(self, item):
        """创建改变tab对应串口号的线程"""
        try:
            self.get_now_active_tab()
            if self.tab[self.active_tab]['serial_value'] != item:
                self.tab[self.active_tab]['serial_value'] = item
                self.tab[self.active_tab]['browser'].clear()
                logger.info('更改%s的输出串口为:%s' % (self.active_tab, self.tab[self.active_tab]['serial_value']))
            self.thread_it(self.change_serial)
        except Exception as e:
            logger.info('开始更换串口异常:%s' % e)

    def start_check_log_keyword(self, log_info, serial_port):
        """创建检查告警关键字的线程"""
        self.thread_it(self.check_log_keyword, log_info, serial_port)


class rename_widget(QWidget):
    """重命名的窗口,信号槽返回新的命名"""
    signal = pyqtSignal(str)

    def __init__(self, title):
        super().__init__()
        self.setWindowTitle(title)
        self.resize(300, 80)
        self.setFixedSize(self.width(), self.height())

    def create_widgets(self, old_info):
        try:
            self.row = QVBoxLayout()
            self.name_text = QLineEdit(old_info)
            self.sure_btn = QPushButton('确定')
            self.sure_btn.setFixedWidth(60)
            self.sure_btn.clicked.connect(self.rename_tab)
            self.row.addWidget(self.name_text)
            self.row.addWidget(self.sure_btn, 0, Qt.AlignCenter)
            self.setLayout(self.row)
        except Exception as e:
            logger.info('重命名窗口创建异常:%s' % e)

    def rename_tab(self):
        try:
            new_name = self.name_text.text()
            self.signal.emit(new_name)
        except Exception as e:
            logger.info('传出重命名信号异常:%s' % e)


class keyword_widget(QWidget):
    """修改告警关键字的窗口,信号槽返回新的告警关键字列表"""
    signal = pyqtSignal(str)

    def __init__(self, title):
        super().__init__()
        self.setWindowTitle(title)
        self.resize(700, 350)
        self.setFixedSize(self.width(), self.height())

    def create_widgets(self, keyword):
        try:
            self.row = QVBoxLayout()
            self.keyword_btn = QPushButton('关键字导入')
            self.keyword_btn.setFixedWidth(100)
            self.keyword_btn.clicked.connect(self.select_keyword_txt)
            self.keyword_text = QTextEdit()
            for i in keyword:
                self.keyword_text.append(i)
            self.sure_btn = QPushButton('确定')
            self.sure_btn.setFixedWidth(80)
            self.sure_btn.clicked.connect(self.change_keyword)
            self.row.addWidget(self.keyword_btn)
            self.row.addWidget(self.keyword_text)
            self.row.addWidget(self.sure_btn, 0, Qt.AlignCenter)
            self.setLayout(self.row)
        except Exception as e:
            logger.info('告警关键字窗口创建异常:%s' % e)

    def select_keyword_txt(self):
        """把获取到的告警关键字导入工具"""
        try:
            file = QFileDialog.getOpenFileNames()[0][0]
            if file:
                keyword_list = tool.read_keyword_txt(str(file))
                if keyword_list:
                    self.keyword_text.clear()
                    for i in keyword_list:
                        self.keyword_text.append(i)
        except Exception as e:
            logger.info('告警关键字导入异常:%s' % e)

    def change_keyword(self):
        try:
            keyword = self.keyword_text.toPlainText()
            self.signal.emit(keyword)
        except Exception as e:
            logger.info('传出关键字信号异常:%s' % e)


class robot_widget(QWidget):
    """修改机器人推送地址的窗口,信号槽返回新的机器人推送地址"""
    signal = pyqtSignal(str)

    def __init__(self, title):
        super().__init__()
        self.setWindowTitle(title)
        self.resize(400, 80)
        self.setFixedSize(self.width(), self.height())

    def create_widgets(self, old_info):
        try:
            self.row = QVBoxLayout()
            self.robot_addr = QLineEdit(old_info)
            self.sure_btn = QPushButton('确定')
            self.sure_btn.setFixedWidth(50)
            self.sure_btn.clicked.connect(self.change_keyword)
            self.row.addWidget(self.robot_addr)
            self.row.addWidget(self.sure_btn, 0, Qt.AlignCenter)
            self.setLayout(self.row)
        except Exception as e:
            logger.info('机器人推送地址窗口创建异常:%s' % e)

    def change_keyword(self):
        try:
            robot_addr = self.robot_addr.text()
            self.signal.emit(robot_addr)
        except Exception as e:
            logger.info('传出推送地址信号异常:%s' % e)


class Conn_Thread(QThread):
    """持续接收日志的线程,updated_log信号返回工具类连接打印日志的操作,check_log信号返回工具类连接创建检查关键字线程的操作"""
    updated_log = pyqtSignal(str, str)
    check_log = pyqtSignal(str, str)

    def __init__(self):
        super().__init__()

    def run(self):
        """与串口服务器建立TCP连接,无限循环获取数据"""
        try:
            tool.is_running = True
            ip_port = tool.server_url_text.text().split(':')
            ip = ip_port[0]
            port = int(ip_port[1])
            global sock_conn
            # 这里连不上就直接结束
            sock_conn = SocketServer(ip, port)
            if not sock_conn.connect_state:
                logger.info('未连接上串口服务器')
                logger.info('stopped')
                tool.is_running = False
                return
            while True:
                if tool.is_running:
                    recv_list = sock_conn.get_serial_data()
                    if recv_list is False:
                        if sock_conn.Exception > 10:
                            logger.info('sock接口数据异常超过10次,终止进程')
                            tool.is_running = False
                    if recv_list:
                        try:
                            for info in recv_list:
                                try:
                                    tool.save_log(info['log_info'], info['serial'])
                                except Exception as e:
                                    logger.info('保存日志异常:%s' % e)
                                try:
                                    for i in tool.tab:              # 当识别到有tab页的串口是当前日志的归属,则信号返回通知工具类去打印该日志
                                        if tool.tab[i]['serial_value'] == info['serial']:
                                            self.updated_log.emit(i, info['log_info'])
                                            break
                                except Exception as e:
                                    logger.info('输出日志异常:%s' % e)

                                if tool.keyword_list:
                                    try:
                                        self.check_log.emit(info['log_info'], info['serial'])
                                    except Exception as e:
                                        logger.info('返回检查日志信号异常:%s' % e)

                        except Exception as e:
                            logger.info('处理recv_list异常:%s,recv_list:%s' % (e, recv_list))
                else:
                    logger.info('stopped')
                    break
        except Exception as e:
            logger.info('线程中连接TCP服务器获取数据异常:%s' % e)


class Send_data_Thread(QThread):
    """发送命令的线程,data_input_signal信号通知工具类将命令打印在命令框上"""
    data_input_signal = pyqtSignal(str)

    def __init__(self):
        super().__init__()
        self.data = {}

    def run(self):
        """给串口服务器发送数据"""
        try:
            if not self.data:
                logger.info('线程中未传入数据')
                return
            logger.info(self.data)
            if self.data:
                if len(self.data['data']) == 1:      # 只有一条数据
                    if self.data['send']:   # 自动发送
                        tool.send_data_to_server(data=self.data['data'][0])
                    else:               # 打印出来,不自动发送
                        self.data_input_signal.emit(self.data['data'][0])
            else:               # 大于1条数据,需要发送多次
                for i in self.data['data']:
                    if i == 'p':
                        time.sleep(1)
                    else:
                        if self.data['send']:
                            tool.send_data_to_server(data=i)
                        else:
                            self.data_input_signal.emit(i)
        except Exception as e:
            logger.info('线程中给服务器发送命令异常:%s' % e)


if __name__ == "__main__":
    # 日志初始化
    # 设置输出格式
    formater = logging.Formatter('%(asctime)s [line:%(lineno)d] -%(levelname)s- %(message)s')
    # 定义一个日志收集器
    logger = logging.getLogger('log')
    # 设定级别
    logger.setLevel(logging.DEBUG)
    # 以文件形式输出,初始化对象为handler
    if getattr(sys, 'frozen', False):
        path1 = sys.executable
        path2 = os.path.dirname(path1)
    else:
        path2 = os.path.dirname(os.path.abspath(__file__))
    log_dir = os.path.join(path2, "ToolLogs")
    if not os.path.exists(log_dir):
        os.mkdir(log_dir)
    log = os.path.join(log_dir, "%s.log" % datetime.datetime.now().strftime("%Y%m%d"))
    handler = logging.handlers.RotatingFileHandler(log, maxBytes=20 * 1024 * 1024, backupCount=10)
    # log文件对接输出格式
    handler.setFormatter(formater)
    # 日志收集器logger对接log文件
    logger.addHandler(handler)
    logger.info('')  # 标志

    title = os.path.splitext(os.path.basename(sys.argv[0]))[0]

    app = QApplication(sys.argv)
    tool = Tool()

    tool.setWindowTitle(title)
    screen = QDesktopWidget().screenGeometry()
    size = tool.geometry()

    tool.move((screen.width() - size.width()) / 2, (screen.height() - size.height()) / 2)
    tool.show()

    # 启动推送告警信息的定时器
    push_robot = threading.Thread(target=tool.set_push_robot_timer, args=(43200,))
    push_robot.setDaemon(True)
    push_robot.start()

    err = app.exec()
    logger.info('检测到程序退出指令')
    logger.info(err)
    sys.exit(err)


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

python3+pyqt5实践历程(一)——基于socket通信与pyqt5的串口工具 的相关文章

随机推荐

  • post使用form-data和x-www-form-urlencoded的本质区别

    一是数据包格式的区别 二是数据包中非ANSCII字符怎么编码 是百分号转码发送还是直接发送 一 application x www form urlencoded 1 它是post的默认格式 使用js中URLencode转码方法 包括将na
  • 修改onnx模型输出示例

    前言 如图是netron github链接 软件中打开的onnx模型 可以看到右边模型的最终输出结果是分类值predict 0而非概率值 那么如何获取中间过程的概率值 或者说怎么把右边的图砍掉一截变成左边的图呢 代码 读入模型 import
  • keras_cv进行数据增强

    使用keras cv来做分类数据增强 以下直接上流程 具体的原理和代码上github查看源码及配合tensorflow官网及keras官网来做处理 当前 2022 10 8 这些文档还不是很全 import os import numpy
  • Shell 运行shell脚本的多种方法

    详情地址 运行shell脚本的多种方法 小步教程 Shell 运行shell脚本的多种方法 运行shell脚本文件可通过两类方法 方法1 bash执行 语法 sh 文件 文件可使用相对路径或绝对路径 示例 sh 01hello sh 等价写
  • 游戏出现GetThreadContext failed报错 Unity开发

    解决方案 1 检查是否有360 有的情况 1 简单方案 卸载360 2 专业方案 将游戏exe添加到360信任名单中 解释 360会将一些模拟按键视为木马 然后游戏运行一般直接闪退 2 检查防火墙 专业方案 将游戏exe加入防火墙允许应用的
  • 多端技术栈uniapp开发优势是什么?适合哪种类型产品开发?

    uniapp是一种基于Vue js的跨平台开发框架 它可以支持以单一代码库编写多个平台的应用程序 包括iOS Android Web等 以下是uniapp开发的优势和适用类型的介绍 1 跨平台开发 相比于传统的原生开发 uniapp可以基于
  • tar 打包、压缩和备份

    如何理解 首先讲两个概念 打包 将一大堆文件或目录变成一个总的文件 压缩 将一个大的文件通过压缩算法变成一个小文件 这两种场景一定要区分开 网络上有的技术文章 将tar命令解释为压缩命令 是不完全正确的 关于此点 本文不再拓展 感兴趣的可以
  • 黄页是什么意思

    黄页 起源于北美洲 1880年世界上第一本黄页电话号簿在美国问世 至今已有100多年的历史 黄页是国际通用按企业性质和产品类别编排的工商电话号码薄 相当于一个城市或地区的工商企业的户口本 国际惯例用黄色纸张印制 故称黄页 目前我们常说的黄页
  • Python 类型提示和静态类型检查

    介绍 在本文中 将了解 Python 类型检查 Type Checking 在本教程中 将了解以下内容 类型注释和类型提示 将静态类型添加到代码中 包括你的代码和其他人的代码 运行静态类型检查器 在运行时强制类型 视频介绍如下 Python
  • 树莓派软键盘乱码

    树莓派软键盘乱码的快速处理 matchbox keyboard的显示 处理办法 matchbox keyboard的显示 正常的Matchbox keyboard安装完成后应该出现如下的界面 但是 在初次安装时 发现部分用户的界面出现乱码情
  • react,useEffect一直重复执行

    import useState useEffect from react useEffect callback arr useEffect接受两个参数 callback 回调函数 第一次会默认执行一次 内部可以return一个回调函数 当卸
  • 客户端和服务端端口的建立与连接

    socket 建立通信的端口 并返回引用该端口的文件描述符 man sockst https man7 org linux man pages man2 socket 2 html 头文件 include
  • nacos服务中断导致项目无法连接,就算nacos服务恢复也不会自动注册,springboot要如何配置nacos自动重连?...

    您可以在配置文件中 例如 application properties 或 application yml 中添加如下配置来启用 Nacos 自动重连 spring cloud nacos discovery retry enabled t
  • [网络安全提高篇] 一〇九.津门杯CTF的Web Write-Up万字详解(SSRF、文件上传、SQL注入、代码审计、中国蚁剑)

    这是作者网络安全自学教程系列 主要是关于安全工具和实践操作的在线笔记 特分享出来与博友们学习 希望您喜欢 一起进步 这篇文章主要介绍5月9日参加津门杯CTF题目知识 包括power cut hate php Go0SS HploadHub和
  • 一步一步写算法(之单向链表)

    声明 版权所有 欢迎转载 请勿用于商业用途 联系信箱 feixiaoxing 163 com 有的时候 处于内存中的数据并不是连续的 那么这时候 我们就需要在数据结构中添加一个属性 这个属性会记录下面一个数据的地址 有了这个地址之后 所有的
  • 四种方法让你的Boost电路更安全

    开关电源最常见的三种结构布局是降压 buck 升压 boost 和降压 升压 buck boost 这三种布局都不是相互隔离的 今天介绍的主角是boost升压电路 the boost converter 或者叫step up convert
  • 宜搭低代码开发师(高级)创建待办列表应用 流程截图及实例代码(避坑专用)

    目录 目标 操作步骤 一 主要涉及的接口 二 代码及说明步骤 目标 试题截图及步骤代码说明 很快完成考试
  • 蓝桥杯常用知识点

    datetime 库 import datetime 设置时间 start datetime date 1901 1 1 不算这一天 是从1900 12 31开始的 end datetime date 2001 1 1 到2000 12 3
  • Tesseract5.0.0+OpenCV3+VS2019安装、字符识别学习

    Tesseract5 0 0 OpenCV3 VS2019安装 字符识别学习 背景 Visual Studio 2019安装 OpenCV3安装 配置 Tesseract v5 0 0安装 在Windows PowerShell中下指令识别
  • python3+pyqt5实践历程(一)——基于socket通信与pyqt5的串口工具

    python3 pyqt5实践历程 一 基于socket通信与pyqt5的串口工具 文章目录 系列文章目录 制作背景 最终功能 工具截图展示 代码详解 系列文章目录 python3 tkinter实践历程 一 基于requests与tkin