Python使用threading.Timer实现执行可循环的定时任务

2023-10-29

前言

Python中使用threading.Timer执行定时任务时,执行任务是一次性的,类似于JS中的setTimeout方法。我们对其在封装,改造成可循环的定时器,类似于JS中setInterval方法的效果。

值得注意的是,threading.Timer是非阻塞的,不同于使用time.sleep实现的简单定时任务,而且重要的一点是threading.Timer可以取消定时任务的执行。

源码

本循环定时器只是简单的实现了单任务定时器,并提供了三个基本功能:执行任务、判断任务是否执行中、取消任务,对于多任务可以通过队列、字典等数据结构存放任务实现多任务的功能。

计时器通过函数递归调用?实测不会造成栈溢出,但随着任务循环执行,线程计数器不断递增。

废话不多说,直接上代码:

# -*-coding:utf-8-*-

from threading import Timer


class SimpleIntervalTaskTimer(object):
    """
    简单的循环单任务定时器,非阻塞当前线程
    """

    def __init__(self):
        self.__timer: Timer = None
        self.__seconds = 0
        self.__action = None
        self.__args = None
        self.__kwargs = None

    def run(self, seconds: int, action, args=None, kwargs=None):
        """
        执行循环定时任务

        :param seconds: 任务执行间隔,单位秒
        :param action: 任务函数
        :param args: 函数参数
        """
        if not callable(action):
            raise AttributeError("参数action非法,请传入函数变量")

        if self.is_running():
            print("已有任务在执行,请取消后再操作")
            return

        self.__action = action
        self.__seconds = seconds
        self.__args = args if args is not None else []
        self.__kwargs = kwargs if kwargs is not None else {}

        self.__run_action()

    def __run_action(self):
        self.__timer = Timer(self.__seconds, self.__hook, self.__args, self.__kwargs)
        self.__timer.start()

    def __hook(self, *args, **kwargs):
        self.__action(*args, **kwargs)
        self.__run_action()

    def is_running(self):
        """
        判断任务是否在执行
        """
        return self.__timer and self.__timer.is_alive()

    def cancel(self):
        """
        取消循环定时任务
        """
        if self.is_running():
            self.__timer.cancel()
            self.__timer = None

使用

# -*-coding:utf-8-*-

import threading
import time

import SimpleIntervalTaskTimer


class TestIntervalTaskTimer(object):

    def __run1(self, param):
        print(param)

    def __run2(self, param=None):
        print(param)

    def __run3(self):
        timer = SimpleTaskTimer()
        timer.run(1, self.__run1, args=['111'])
        time.sleep(5)
        timer.cancel()

        timer.run(1, self.__run2, kwargs={'param': '222'})
        time.sleep(5)
        timer.cancel()

    def runTest1(self):
        self.__run3()

    def runTest2(self):
        threading.Thread(target=lambda: self.__run3()).start()

if __name__ == '__main__':
    t = TestIntervalTaskTimer()
    t.runTest1()
    t.runTest2()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python使用threading.Timer实现执行可循环的定时任务 的相关文章

随机推荐

  • 基于selenium的网易邮箱自动登录获取邮件内容(混杂request\urllib)

    目录 前言 selenium介绍 环境配置 selenium抓取 检查网页 程序实现 前言 163邮箱的访问基于urllib进行爬取时需要自己从网页拿取cookie 但这个cookie是动态的 如果想实时对邮箱进行检查是否有新邮件 cook
  • html怎么引入图片,html怎么引入图片?

    本文介绍了在html中插入图片的方法 主要使用了html中的img标签 通过指定src属性即可插入图片 下面我们就来学习下吧 html怎么引入图片 html中插入图片可以使用img标签来实现 1 新建html文件 如图所示 在body标签中
  • Django、数据库----------ORM框架

    目录 1 安装第三方模块 2 ORM 1 自己创建数据库 2 django连接数据库 3 django操作表 创建表 在models py文件中 删除表以及修改表 在表中新增列时 由于已存在列中可能已有数据 所以新增列必须要指定新增列对应的
  • 离职跳槽,你想清楚了吗?

    近期身边的一个同事提出离职 无论从其个人职业发展 近期的形势 还是他离职要解决的问题来看 这个时候离开并不是一个好的选择 最终还是没有挽留下来 非常之遗憾 此前 也写过一篇工作的思考 读者可以移步阅读 小作文 非技术贴 天下熙熙攘攘 皆为利
  • linux 命令记录

    目录 查看某个文件或目录占用的磁盘空间大小 主要命令 du 可以断点续传的 scp 查看某个文件或目录占用的磁盘空间大小 主要命令 du du ah max depth 1 a 表示目录下所以的文件和文件夹 h 表示人类能看懂的方式 max
  • 高速数字系统时钟设计-AD9516

    此篇是我在学习中做的归纳与总结 其中如果存在版权或知识错误请直接联系我 欢迎留言 PS 本着知识共享的原则 此篇博客可以随意转载 但请标明出处 在高速数字系统中 时钟起到至关重要的作用 它决定系统工作的稳定性与准确性 尤其在包含Zynq 高
  • Keil MDK5实际使用中遇到的一些坑

    最近对一些新出芯片进行开发 编译环境从Keil MDK4升级到Keil MDK5 然后就遇到各种坑 1 程序无法全速运行 无法进入断点 进入不了相应函数 原因 断点打多了 居然没有提示 以前用IAR断点打多了会提示你部分断点会无效 MDK5
  • 带有加密功能的 SQLite Qt 插件

    Qt 已经内置了一个 SQLite 数据库 方便我们开发桌面应用 但是这个 SQLite 是官方提供的开源版本 这意味着这个版本的 SQLite 实际是没有加密功能的 对于一般的桌面应用 数据库加密有时是比较重要的 特别是当你需要对所存储的
  • Java -NIO简介

    Java NIO 在1 4版本之前 Java IO类库是阻塞IO 从1 4版本开始 引进了新的异步IO库 被称为Java New IO类库 简称为JAVA NIO New IO类库的目标 就是要让Java支持非阻塞IO 基于这个原因 更多的
  • Altium Design圆弧走线与敷铜

    Altium Design圆弧走线与敷铜 基本步骤 设置AD优选项 设置输入法 美国键盘 打开Windows设置 选择时间和语言 点击语言 添加首选语言 并安装English 美国 语言包 完成安装 切换输入法 基本步骤 对于我们大部分微软
  • 硬件设计检查事项

    1 原理图逻辑框图 2 原理图电气连接 3 原理图检查 4 PCB外框 5 PCB元件布局 6 PCB布线 7 PCB覆铜 8 DRC校验 9 BGA盘中孔同心 10 元件位号 标注 11 有源期间极性 标注 12 名称 版本 日期 人员
  • splint的安装与使用

    简介 splint是一个GNU免费授权的 Lint程序 是一个动态检查C语言程序安全弱点和编写错误的程序 Splint会进行多种常规检查 包括未使用的变量 类型不一致 使用未定义变量 无法执行的代码 忽略返回值 执行路径未返回 无限循环等错
  • 前端开发者必须知道的 10 个 GitHub 仓库

    内容整理自 ravikmmr 的 Twitter Thread 1 Developer roadmap 初学者如果想学习前端开发 但是不知道从何学起 推荐查看此仓库 你可以获得有关开发的所有学习路线 笔者在之前的文章中对其进行过翻译 2 F
  • Python构建SVM分类器(线性)

    1 SVM建立线性分类器 SVM用来构建分类器和回归器的监督学习模型 SVM通过对数学方程组的求解 可以找出两组数据之间的最佳分割边界 2 准备工作 我们首先对数据进行可视化 使用的文件来自学习书籍配套管网 首先增加以下代码 import
  • 腾讯云阿里云服务器被打进黑洞怎么办

    当腾讯云腾讯云服务器被打进黑洞了我们该怎么办 首先我们要知道以下的这些 黑洞 是什么 黑洞是指服务器受攻击流量超过本机房黑洞阈值时 云计算服务商屏蔽服务器的外网访问 当服务器进入黑洞一段时间后 如果系统监控到攻击流量停止 黑洞会自动解封 进
  • 程序员微信名昵称_微信名字大全

    微信名字 好听的微信名字大全 只求一份安定 无可置疑 吥 恠侑嗳 丶演绎悲伤 一生承诺 简单灬爱 流年灬未亡 舞动D 灵魂 别在我面前犯贱 没有背景丶只有背影 乂日光倾城 丶猫猫er 雪花 飞舞 在哪跌倒 就在哪躺下 淡抹丶悲伤 稀饭你的笑
  • 考研算法题:最短边数最短路

    题目 一个图有很多条最短路 求所有最短路里面的边数最少的最短路的边数 思路1 先求最短路 然后BFS倒推寻找最短边数的最短路的边数 找到直接返回cnt值 include
  • 机器学习- CS 760 Machine Learning

    代码后台私我
  • 【Spring】ERR_INCOMPLETE_CHUNKED_ENCODING 200 (OK) 问题解决

    1 概述 转载 ERR INCOMPLETE CHUNKED ENCODING 200 OK 问题解决 我是在做这个项目的时候遇到这个报错 Spring Spring 网络原因导致日志下载失败 2 简述 浏览器调用接口报错 net ERR
  • Python使用threading.Timer实现执行可循环的定时任务

    前言 Python中使用threading Timer执行定时任务时 执行任务是一次性的 类似于JS中的setTimeout方法 我们对其在封装 改造成可循环的定时器 类似于JS中setInterval方法的效果 值得注意的是 thread