中断当前正在执行的所有 asyncio.sleep

2024-02-02

where

这是在 Linux、Python 3.5.1 上。

what

我正在开发一个监控流程asyncio,他们在不同地方的任务await on asyncio.sleep不同时长的呼叫。

有时我希望能够打断所有所说的话asyncio.sleep调用并让所有任务正常进行,但我找不到如何做到这一点。一个示例是监控进程的正常关闭。

如何(失败的假设)

我以为我可以发送一个警报信号来达到这种效果,但进程终止了。我尝试用以下命令捕获警报信号:

def sigalrm_sent(signum, frame):
    tse.logger.info("got SIGALRM")

signal.signal(signal.SIGALRM, sigalrm_sent)

然后我得到了关于捕获 SIGALRM 的日志行,但是asyncio.sleep通话不会中断。

如何(拼凑)

此时我已经全部替换了asyncio.sleep调用此协程的调用:

async def interruptible_sleep(seconds):
    while seconds > 0 and not tse.stop_requested:
        duration = min(seconds, tse.TIME_QUANTUM)
        await asyncio.sleep(duration)
        seconds -= duration

所以我只需要选择一个TIME_QUANTUM不算太小,也不算太大。

but

有没有办法中断所有运行asyncio.sleep打电话而我错过了?


中断所有正在运行的调用asyncio.sleep似乎有点危险,因为它可以用于代码的其他部分,用于其他目的。相反,我会专门做一个sleep跟踪其正在运行的调用的协程。然后可以通过取消相应的任务来中断它们:

def make_sleep():
    async def sleep(delay, result=None, *, loop=None):
        coro = asyncio.sleep(delay, result=result, loop=loop)
        task = asyncio.ensure_future(coro)
        sleep.tasks.add(task)
        try:
            return await task
        except asyncio.CancelledError:
            return result
        finally:
            sleep.tasks.remove(task)

    sleep.tasks = set()
    sleep.cancel_all = lambda: sum(task.cancel() for task in sleep.tasks)
    return sleep

Example:

async def main(sleep, loop):
    for i in range(10):
        loop.create_task(sleep(i))
    await sleep(3)
    nb_cancelled = sleep.cancel_all()
    await asyncio.wait(sleep.tasks)
    return nb_cancelled

sleep = make_sleep()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main(sleep, loop)) 
print(result)  # Print '6'

出于调试目的,loop.time = lambda: float('inf')也有效。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

中断当前正在执行的所有 asyncio.sleep 的相关文章

  • SQLAlchemy 通过关联对象声明式多对多自连接

    我有一个用户表和一个朋友表 它将用户映射到其他用户 因为每个用户可以有很多朋友 这个关系显然是对称的 如果用户A是用户B的朋友 那么用户B也是用户A的朋友 我只存储这个关系一次 除了两个用户 ID 之外 Friends 表还有其他字段 因此
  • 使用带有关键字参数的 map() 函数

    这是我尝试使用的循环map功能于 volume ids 1 2 3 4 5 ip 172 12 13 122 for volume id in volume ids my function volume id ip ip 我有办法做到这一点
  • Django:按钮链接

    我是一名 Django 新手用户 尝试创建一个按钮 单击该按钮会链接到我网站中的另一个页面 我尝试了一些不同的例子 但似乎没有一个对我有用 举个例子 为什么这不起作用
  • Flask 会话变量

    我正在用 Flask 编写一个小型网络应用程序 当两个用户 在同一网络下 尝试使用应用程序时 我遇到会话变量问题 这是代码 import os from flask import Flask request render template
  • 如何使用Conda下载python包并随后离线安装?

    我知道通过 pip 我可以使用以下命令下载 Python 包 但 pip install 破坏了我的内部包依赖关系 当我做 pip download
  • 从字符串中删除识别的日期

    作为输入 我有几个包含不同格式日期的字符串 例如 彼得在16 45 我的生日是1990年7月8日 On 7 月 11 日星期六我会回家 I use dateutil parser parse识别字符串中的日期 在下一步中 我想从字符串中删除
  • Android 中 Kotlin 协程的正确使用方式

    我正在尝试使用异步更新适配器内的列表 我可以看到有太多的样板 这是使用 Kotlin 协程的正确方法吗 这个可以进一步优化吗 fun loadListOfMediaInAsync async CommonPool try Long runn
  • 使用 on_bad_lines 将 pandas.read_csv 中的无效行写入文件

    我有一个 CSV 文件 我正在使用 Python 来解析该文件 我发现文件中的某些行具有不同的列数 001 Snow Jon 19801201 002 Crom Jake 19920103 003 Wise Frank 19880303 l
  • 是否可以忽略一行的pyright检查?

    我需要忽略一行的pyright 检查 有什么特别的评论吗 def create slog group SLogGroup data Optional dict None SLog insert one SLog group group da
  • SQLALchemy .query:类“Car”的未解析属性引用“query”

    我有一个这里已经提到的问题https youtrack jetbrains com issue PY 44557 https youtrack jetbrains com issue PY 44557 但我还没有找到解决方案 我使用 Pyt
  • 如何使用 OpencV 从 Firebase 读取图像?

    有没有使用 OpenCV 从 Firebase 读取图像的想法 或者我必须先下载图片 然后从本地文件夹执行 cv imread 功能 有什么办法我可以使用cv imread link of picture from firebase 您可以
  • 绘制方程

    我正在尝试创建一个函数 它将绘制我告诉它的任何公式 import numpy as np import matplotlib pyplot as plt def graph formula x range x np array x rang
  • BeautifulSoup 中的嵌套标签 - Python

    我在网站和 stackoverflow 上查看了许多示例 但找不到解决我的问题的通用解决方案 我正在处理一个非常混乱的网站 我想抓取一些数据 标记看起来像这样 table tbody tr tr tr td td td table tr t
  • 如何在ipywidget按钮中显示全文?

    我正在创建一个ipywidget带有一些文本的按钮 但按钮中未显示全文 我使用的代码如下 import ipywidgets as widgets from IPython display import display button wid
  • Google BQ:运行参数化查询,其中参数变量是 BQ 表目标

    我正在尝试从 Linux 命令行为 BQ 表目标运行 SQL 此 SQL 脚本将用于多个日期 客户端和 BQ 表目标 因此这需要在我的 BQ API 命令行调用中使用参数 标志 parameter 现在 我已经点击此链接来了解参数化查询 h
  • 无法在 Python 3 中导入 cProfile

    我试图将 cProfile 模块导入 Python 3 3 0 但出现以下错误 Traceback most recent call last File
  • 每个 X 具有多个 Y 值的 Python 散点图

    我正在尝试使用 Python 创建一个散点图 其中包含两个 X 类别 cat1 cat2 每个类别都有多个 Y 值 如果每个 X 值的 Y 值的数量相同 我可以使用以下代码使其工作 import numpy as np import mat
  • 如何在 Python 中追加到 JSON 文件?

    我有一个 JSON 文件 其中包含 67790 1 kwh 319 4 现在我创建一个字典a dict我需要将其附加到 JSON 文件中 我尝试了这段代码 with open DATA FILENAME a as f json obj js
  • 在 Qt 中自动调整标签文本大小 - 奇怪的行为

    在 Qt 中 我有一个复合小部件 它由排列在 QBoxLayouts 内的多个 QLabels 组成 当小部件调整大小时 我希望标签文本缩放以填充标签区域 并且我已经在 resizeEvent 中实现了文本大小的调整 这可行 但似乎发生了某
  • 从列表指向字典变量

    假设你有一个清单 a 3 4 1 我想用这些信息来指向字典 b 3 4 1 现在 我需要的是一个常规 看到该值后 在 b 的位置内读写一个值 我不喜欢复制变量 我想直接改变变量b的内容 假设b是一个嵌套字典 你可以这样做 reduce di

随机推荐

  • Eclipse:在 GWT 项目的版本控制中存储哪些文件

    我正在 Eclipse 中使用 Mercurial 进行 GWT 项目以进行版本控制 我应该在版本控制下存储哪些文件 或者 也许更简洁地说 我应该哪些文件not存储 因为它们要么是 GWT 的一部分 要么是构建过程的工件 我正在使用 Ecl
  • 使用 if(isset($_POST['submit'])) 在脚本打开时不显示回显不起作用

    我的方法有点问题if isset POST submit 代码 我想要的是一些回声和一个表格 当脚本打开时不会出现 但我确实希望它在单击表单的提交按钮时显示 问题是当我包括if isset POST submit 函数 当我单击提交按钮时
  • 如何将 microbit 与 BLE 连接并监听按钮按下事件?

    2021 年 11 月 28 日编辑 如果您需要使用蓝牙低功耗将 microbit 连接到计算机 并在单击按钮时执行操作 直接跳并跟随 ukBaz https stackoverflow com users 7721752 ukbaz的回答
  • .net 4.0 中 MemoryCache 与 ObjectCache 有何区别?

    NET框架4 0有什么区别MemoryCache vs ObjectCache 在哪里使用哪个对象 ObjectCache 是一个抽象类 它演示了如何构建一个符合编写 ObjectCache 的人希望您遵守的规则的缓存 您不能直接实例化 O
  • 如何在共享主机上设置 cakephp?

    这是我用 Cake 无法做到的一件事 我已经尝试了几次 但无法弄清楚 我在 hostgator 上 如果有人能指出他们如何设置它 高级安装 它会有所帮助 Edit 我已阅读文档并收到错误 我想我读得还不够好 Edit我刚刚找到这个帖子 ht
  • Graphics2D:我应该使用 int 版本还是 float 版本?

    一些Graphics2D方法 例如drawString 有将坐标作为的版本int or float 有什么理由选择其中之一 同样 我应该使用较新的Shape类 例如Rectangle2D 使用浮点坐标 或使用Rectangle 将坐标定义为
  • ffmpeg 中的去隔行

    我已按照教程进行操作here http dranger com ffmpeg 将视频文件加载到 C 程序中 但帧不是去隔行的 据我所知 ffmpeg 可执行文件支持 deinterlace 开关 我如何在代码中执行此操作 我应该阅读哪些库
  • 如何将按钮名称绑定到内容?

    我有一个按钮列表
  • XSLT 复制所有节点,并按分隔符分割

    我正在寻找一个执行以下操作的 xslt 以输入 xml 为例
  • 无法与任何提供的主机建立套接字

    我正在努力解决 android 中的文件传输问题 我正在使用 smack 4 1 连接到 openfire 服务器 我的问题是 当我使用 Spark 到 Spark 文件传输时 它工作正常 但是当我从Spark 到 Android 或 An
  • 如何在 django 自定义身份验证后端访问请求?

    我想用 django 的身份验证执行以下操作 记录错误的登录尝试 在 x 次错误登录尝试后暂时锁定帐户 记录成功登录 我认为自定义身份验证后端将是解决方案 我可以做我想做的大部分事情 但我想记录进行尝试的用户的 IP 和 REMOTE HO
  • Excel Yield 函数的.NET 实现

    Excel 的名为 分析工具库 的插件提供了 收益率 函数 用于计算定期支付利息的证券的收益率 函数运行良好并返回正确的数据 我的理解是基于迭代的函数 在我的代码中实现它并不容易 我的问题是有人知道 见过 C 最终是其他语言 的实现并可以分
  • 在 Groovy 中获取由字符分隔的子字符串

    考虑下面的字符串 String names Bharath Vinayak Harish Punith 我想以它仅包含的字符串形式获得输出Bharath 字符串直到第一次出现 运算符 任何人都可以告诉我 我们该怎么做 在一般情况下 我同意s
  • Python 列表理解代价高昂

    我试图找到列表理解的效率 但它看起来比普通函数操作更昂贵 有人可以解释一下吗 def squares values lst for x in range values lst append x x return lst def main t
  • 如何在没有 root 访问权限的情况下在本地安装 CPAN 模块(DynaLoader.pm 第 229 行错误)?

    不能与其他模块一起使用 但举个例子 我使用 CPAN 设置安装了 Text CSV XS makepl arg gt q PREFIX lib 当我尝试运行 test pl 脚本时 perl 测试 pl usr bin perl use l
  • 计算n的最佳方法选择k?

    评估 价值 最有效的方法是什么 n choose k 我认为的蛮力方法是找到n k n k 通过单独计算每个阶乘 更好的策略可能是根据这个使用DP递归公式 https i stack imgur com Kq3OH png nCk n 1
  • WHERE IN问题中的SQL占位符,插入字符串失败

    作为我工作的一部分 我需要编写 SQL 查询来连接到我们的 PI 数据库 要生成查询 我需要传递一个array标签 本质上是主键 但这些必须作为字符串插入 由于这将是一个模块化查询并用于多个标签 因此使用了占位符 该查询依赖于 WHERE
  • OpenGL - ARB 扩展

    我使用的是 MacBook Pro 13 英寸 2010 年中 并且使用 OpenGL 我注意到 库中缺少一些功能 我在互联网上找到了有关我的硬件的规格 上面写着 支持OpenGL 3 3 这很奇怪 所以我打印了我的 OpenGL 版本并这
  • 使用deathbycaptcha服务处理Google recaptcha v2时如何控制scrapy中的请求流?

    你好 我正在使用 python 使用 scrapy 网络爬行框架 抓取网站并使用 Deathbycaptcha 服务解决我在其页面上遇到的验证码 我的下载延迟设置为 30 秒 我只抓取几页来获取基本信息 这样我就不会过多地占用网站带宽或任何
  • 中断当前正在执行的所有 asyncio.sleep

    where 这是在 Linux Python 3 5 1 上 what 我正在开发一个监控流程asyncio 他们在不同地方的任务await on asyncio sleep不同时长的呼叫 有时我希望能够打断所有所说的话asyncio sl