Python 3:如何将异步函数提交到线程池?

2023-12-28

我想同时使用ThreadPoolExecutor from concurrent.futures和异步函数。

我的程序重复向线程池提交具有不同输入值的函数。在该较大函数中执行的最终任务序列可以按任何顺序,并且我不关心返回值,只关心它们在将来的某个时刻执行。

所以我尝试这样做

async def startLoop():

    while 1:
        for item in clients:
            arrayOfFutures.append(await config.threadPool.submit(threadWork, obj))

        wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)

其中提交的函数是:

async def threadWork(obj):
   bool = do_something() # needs to execute before next functions
   if bool:
       do_a() # can be executed at any time
       do_b() # ^

where do_b and do_a是异步函数。问题是我收到错误:TypeError: object Future can't be used in 'await' expression如果我删除等待,我会收到另一个错误,说我需要添加await.

我想我可以让一切都使用线程,但我真的不想这样做。


我建议仔细阅读 Python 3异步开发指南 https://docs.python.org/3/library/asyncio-dev.html,特别是“并发和多线程”部分。

示例中的主要概念问题是事件循环是单线程的,因此在线程池中执行异步协程没有意义。事件循环和线程交互有几种方式:

  • 每个线程的事件循环。例如:

     async def threadWorkAsync(obj):
         b = do_something()
         if b:
             # Run a and b as concurrent tasks
             task_a = asyncio.create_task(do_a())
             task_b = asyncio.create_task(do_b())
             await task_a
             await task_b
    
     def threadWork(obj):
         # Create run loop for this thread and block until completion
         asyncio.run(threadWorkAsync())
    
     def startLoop():
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(config.threadPool.submit(threadWork, item))
    
             wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
    
  • 在执行器中执行阻塞代码。这允许您使用异步 future 而不是上面的并发 future。

     async def startLoop():
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(asyncio.run_in_executor(
                     config.threadPool, threadWork, item))
    
             await asyncio.gather(*arrayOfFutures)
    
  • 使用线程安全函数将任务提交到跨线程的事件循环。例如,您可以在主线程的运行循环中运行所有异步协程,而不是为每个线程创建运行循环:

     def threadWork(obj, loop):
         b = do_something()
         if b:
             future_a = asyncio.run_coroutine_threadsafe(do_a(), loop)
             future_b = asyncio.run_coroutine_threadsafe(do_b(), loop)
             concurrent.futures.wait([future_a, future_b])
    
     async def startLoop():
         loop = asyncio.get_running_loop()
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(asyncio.run_in_executor(
                     config.threadPool, threadWork, item, loop))
    
             await asyncio.gather(*arrayOfFutures)
    

    Note:这个示例不应该按字面意思使用,因为它会导致所有协程在主线程中执行,而线程池工作线程只是阻塞。这只是为了展示一个例子run_coroutine_threadsafe() method.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 3:如何将异步函数提交到线程池? 的相关文章

  • 导入错误:没有名为 _ssl 的模块

    带 Python 2 7 的 Ubuntu Maverick 我不知道如何解决以下导入错误 gt gt gt import ssl Traceback most recent call last File
  • 如何打印没有类型的defaultdict变量?

    在下面的代码中 from collections import defaultdict confusion proba dict defaultdict float for i in xrange 10 confusion proba di
  • Flask 和 uWSGI - 无法加载应用程序 0 (mountpoint='')(找不到可调用或导入错误)

    当我尝试使用 uWSGI 启动 Flask 时 出现以下错误 我是这样开始的 gt cd gt root localhost uwsgi socket 127 0 0 1 6000 file path to folder run py ca
  • 更改自动插入 tkinter 小部件的文本颜色

    我有一个文本框小部件 其中插入了三条消息 一条是开始消息 一条是结束消息 一条是在 单位 被摧毁时发出警报的消息 我希望开始和结束消息是黑色的 但被毁坏的消息 参见我在代码中评论的位置 插入小部件时颜色为红色 我不太确定如何去做这件事 我看
  • 如何等到 Excel 计算公式后再继续 win32com

    我有一个 win32com Python 脚本 它将多个 Excel 文件合并到电子表格中并将其另存为 PDF 现在的工作原理是输出几乎都是 NAME 因为文件是在计算 Excel 文件内容之前输出的 这可能需要一分钟 如何强制工作簿计算值
  • 使用 gcc 在 Linux 上运行线程构建块 (Intel TBB)

    我正在尝试为线程构建块构建一些测试 不幸的是 我无法配置 tbb 库 链接器找不到库 tbb 我尝试在 bin 目录中运行脚本 但这没有帮助 我什至尝试将库文件移动到 usr local lib 但这又失败了 任何的意见都将会有帮助 确定您
  • Python tcl 未正确安装

    我刚刚为 python 安装了graphics py 但是当我尝试运行以下代码时 from graphics import def main win GraphWin My Circle 100 100 c Circle Point 50
  • JavaScript 中的 Promise 有什么意义?

    一个承诺是一个 可能现在可用 或将来可用 或永远不可用的值 来源 MDN 假设我有一个想要处理图片的应用程序 图片已加载 例如在算法在后台使用它之后 或某种其他类型的延迟 现在我想检查一下图片是否可以在future 通过使用承诺 而不是回调
  • 运行多个 scrapy 蜘蛛的正确方法

    我只是尝试使用在同一进程中运行多个蜘蛛新的 scrapy 文档 http doc scrapy org en 1 0 topics practices html但我得到 AttributeError CrawlerProcess objec
  • 在 NumPy 中获取 ndarray 的索引和值

    我有一个 ndarrayA任意维数N 我想创建一个数组B元组 数组或列表 其中第一个N每个元组中的元素是索引 最后一个元素是该索引的值A 例如 A array 1 2 3 4 5 6 Then B 0 0 1 0 1 2 0 2 3 1 0
  • python pandas 中的双端队列

    我正在使用Python的deque 实现一个简单的循环缓冲区 from collections import deque import numpy as np test sequence np array range 100 2 resha
  • Python:字符串不会转换为浮点数[重复]

    这个问题在这里已经有答案了 我几个小时前写了这个程序 while True print What would you like me to double line raw input gt if line done break else f
  • Pandas Dataframe 中 bool 值的条件前向填充

    问题 如何转发 fill boolTruepandas 数据框中的值 如果是当天的第一个条目 True 到一天结束时 请参阅以下示例和所需的输出 Data import pandas as pd import numpy as np df
  • Node.js 中的异步或步骤

    我无法让我的异步代码与 node js 一起使用 尝试异步和步骤库 代码仅返回第一个函数 似乎没有执行其余函数 我究竟做错了什么 thanks var step require step step function f1 console l
  • C# - 当代表执行异步任务时,我仍然需要 System.Threading 吗?

    由于我可以使用委托执行异步操作 我怀疑在我的应用程序中使用 System Threading 的机会很小 是否存在我无法避免 System Threading 的基本情况 只是我正处于学习阶段 例子 class Program public
  • glpk.LPX 向后兼容性?

    较新版本的glpk没有LPXapi 旧包需要它 我如何使用旧包 例如COBRA http opencobra sourceforge net openCOBRA Welcome html 与较新版本的glpk 注意COBRA适用于 MATL
  • 用于运行可执行文件的python多线程进程

    我正在尝试将一个在 Windows 上运行可执行文件并管理文本输出文件的 python 脚本升级到使用多线程进程的版本 以便我可以利用多个核心 我有四个独立版本的可执行文件 每个线程都知道要访问它们 这部分工作正常 我遇到问题的地方是当它们
  • 协方差矩阵的对角元素不是 1 pandas/numpy

    我有以下数据框 A B 0 1 5 1 2 6 2 3 7 3 4 8 我想计算协方差 a df iloc 0 values b df iloc 1 values 使用 numpy 作为 cov numpy cov a b I get ar
  • Python - 字典和列表相交

    给定以下数据结构 找出这两种数据结构共有的交集键的最有效方法是什么 dict1 2A 3A 4B list1 2A 4B Expected output 2A 4B 如果这也能产生更快的输出 我可以将列表 不是 dict1 组织到任何其他数
  • PyAudio ErrNo 输入溢出 -9981

    我遇到了与用户相同的错误 Python 使用 Pyaudio 以 16000Hz 录制音频时出错 https stackoverflow com questions 12994981 python error audio recording

随机推荐

  • ASP.NET Web API 不允许使用冗长的 base64 URI

    我正在尝试从 Android 客户端接收冗长的 base64 字符串 然后将其解码为 Web API 项目中的位图 以将其作为图像上传到 Azure BLOB 存储 但是 该项目返回此消息并拒绝接受参数 请求 URL 太长 HTTP 错误
  • 将 H:M:S 字符转换为数字

    在文本文件中 我有一个包含字符值的字段 如下所示 00 01 53 910 该值实际上是以小时 分钟 秒为单位的时间 我想将其转换为数值 在此示例中 它应为 113 91 秒 尝试在 R 中使用此代码 我得到 1606287714 as n
  • 查询PrincipalSearcher是否包含多个字符串

    我希望能够查询活动目录 给出包含某些单词 例如用户或管理员 的所有组的列表 下面是我到目前为止所得到的 PrincipalContext ctx new PrincipalContext ContextType Domain GroupPr
  • Hibernate中嵌入成员的成员的独特约束

    是否可以在 Hibernate 中的嵌入式类的成员上定义唯一的约束 我需要确保 Nested i1 和 Nested i2 作为一对 组合 是唯一的 Entity Table uniqueConstrains public class Wi
  • 读取 R 中不同目录中最后创建/修改的文件

    我想读取最近在不同目录中修改 或创建 的 CSV 文件 然后将其放入预先存在的单个数据帧 df 总计 中 我有两种目录可供阅读 A LogIIS FOLDER01 files csv 在其他设备上有一个包含多个files csv的文件夹 如
  • Markdown:如何显示预览(比如前N个单词)

    我正在使用 Rails 4 和 Kramdown 但我相信这个问题可以扩展到任何支持 Markdown 的 网络 编程语言 我正在制作一个博客网站 在概述页面上 我想显示每篇文章的开头 由于文章可能很长 我只想展示第一部分 一个简单的想法是
  • 数据模板和泛型类型

    我有一个通用类 public abstract class BaseViewModel
  • Doobie 无法找到或构造类型 T 的 Read 实例

    我正在使用 doobie 查询一些数据 一切正常 如下所示 case class Usuario var documento String var nombre String var contrasena String def getUsu
  • gdb 按行号反汇编

    假设我想反汇编文件 x 的 m n 行 其中文件 x 不在当前上下文中 这个手术是否可行 如果可以 如何进行 注意 我正在 x86 Linux 上工作 您可以使用反汇编命令 m键在其汇编对应行前面显示原始 C 行 disassemble m
  • 如何找到 php 数组中最后一次出现的“needle”

    有一个内置函数用于查找值的数组键 array search http www php net array search 然而 正如您从示例中看到的 该函数仅找到第一个出现的位置 而我需要最后一个出现的位置 有没有内置函数为了这 如果没有的话
  • Mass DM 机器人工作正常,但现在无法发送消息

    几个月前 我和一个叫 Diggy 来自这个社区 的人为我和一些朋友在 BlackDesert Online 上运行的公会编写了一个 MassDM 机器人 一直工作得很好 直到 10 月 28 日停止发送 DM 一开始 它只是将 DM 发送给
  • 使用java将大量数据从数据库导出到.csv时出现问题

    我 谢谢你的关注 我想使用 java 将大量数据 实际上是 600 万行 导出到 csv 文件 该应用程序是一个 swing 应用程序 带有 JPA 使用 toplink ojdbc14 我尝试过使用 缓冲写入器 随机存取文件 文件通道 等
  • pandas - 将函数应用于所有其他行的当前行

    我正在利用 pandas 创建一个数据框 如下所示 ratings pandas DataFrame article a 1 1 0 0 article b 1 0 0 0 article c 1 0 0 0 article d 0 0 0
  • Django - 测试失败时记录

    我有很多单元测试Django https www djangoproject com 如果测试失败 我经常需要查看日志 如果可能的话 在控制台中 我无法真正使用日志文件 因为它变得非常混乱 我现在要做的是 激活控制台日志记录settings
  • 删除字符串末尾的空格但保留换行符

    如何检查Python字符串在任何点是否有新行前有一个空格 如果确实如此 我必须删除该单个空格 但保留新行符号 这可能吗 def remspace my str if len my str lt 2 returns unchanged ret
  • 打开模式时自动聚焦输入不起作用 - React Bootstrap

    我有一个由 3 个组件组成的模态 每个组件代表一个阶段 例如第一个组件正在输入用户的名字 当用户单击 下一步 时 它将转到下一个组件 即输入地址 然后用户单击 下一步 它将带用户到最后阶段 输入昵称 在每一个input来自组件的元素 它将有
  • 如何从内联汇编器调用 Win32 API 函数?

    有人可以告诉我这段代码有什么问题吗 我只是从 kernel32 dll 调用 Sleep 函数 怎么了 我使用的是 Visual Studio 2008 任何帮助将不胜感激 非常感谢 asm mov eax 77e2ef66h push 9
  • 如何附加(或其他方法)大量 HTML 代码?

    我需要附加很多 HTML 代码 为了提高可读性 我不想将所有内容写在一行中 而是将它们拆分为常规 HTML 这大概是 15 个新行之类的 问题是 JavaScript 不允许我这样做 var target post comment this
  • 如何从张量流中的RNN模型中提取细胞状态和隐藏状态?

    我是 TensorFlow 新手 很难理解 RNN 模块 我正在尝试从 LSTM 中提取隐藏 单元状态 对于我的代码 我使用的实现https github com aymericdamien TensorFlow Examples http
  • Python 3:如何将异步函数提交到线程池?

    我想同时使用ThreadPoolExecutor from concurrent futures和异步函数 我的程序重复向线程池提交具有不同输入值的函数 在该较大函数中执行的最终任务序列可以按任何顺序 并且我不关心返回值 只关心它们在将来的