Python asyncio:处理可能无限的列表

2023-12-08

我有以下场景:

  • Python 3.6+
  • 输入数据是从文件中逐行读取的。
  • 协程将数据发送到 API(使用aiohttp)并将调用结果保存到 Mongo (使用motor)。所以有很多 IO 发生。

代码是用async / await,并且对于手动执行的单个调用来说效果很好。

我不知道该怎么做是批量使用输入数据。

All asyncio我见过的例子表明asyncio.wait通过发送有限列表作为参数。但我不能简单地向其发送任务列表,因为输入文件可能有数百万行。

我的场景是通过传送带将数据流向消费者。

我还可以做些什么?我希望程序使用它可以聚集的所有资源来处理文件中的数据,但又不会被淹没。


我的场景是通过传送带将数据流向消费者。我还可以做些什么?

您可以创建固定数量的任务,大致相当于传送带的容量,然后将它们从传送带上弹出queue。例如:

async def consumer(queue):
    while True:
        line = await queue.get()
        # connect to API, Mongo, etc.
        ...
        queue.task_done()

async def producer():
    N_TASKS = 10
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(N_TASKS)
    tasks = [loop.create_task(consume(queue)) for _ in range(N_TASKS)]
    try:
        with open('input') as f:
            for line in f:
                await queue.put(line)
        await queue.join()
    finally:
        for t in tasks:
            t.cancel()

由于与线程不同,任务是轻量级的并且不会占用操作系统资源,因此创建“太多”任务也是可以的。 asyncio 可以顺利处理数千个任务,尽管对于这个任务来说这可能有点过分了——几十个就足够了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python asyncio:处理可能无限的列表 的相关文章

  • 使用 psycopg2 在 python 中执行查询时出现“编程错误:语法错误位于或附近”

    我正在运行 Python v 2 7 和 psycopg2 v 2 5 我有一个 postgresql 数据库函数 它将 SQL 查询作为文本字段返回 我使用以下代码来调用该函数并从文本字段中提取查询 cur2 execute SELECT
  • 通过 Scrapy 抓取 Google Analytics

    我一直在尝试使用 Scrapy 从 Google Analytics 获取一些数据 尽管我是一个完全的 Python 新手 但我已经取得了一些进展 我现在可以通过 Scrapy 登录 Google Analytics 但我需要发出 AJAX
  • 使用 async/await 抛出和捕获异常的正确方法

    全部请拿下面的二维码 Task
  • 将数据从 python pandas 数据框导出或写入 MS Access 表

    我正在尝试将数据从 python pandas 数据框导出到现有的 MS Access 表 我想用已更新的数据替换 MS Access 表 在 python 中 我尝试使用 pandas to sql 但收到错误消息 我觉得很奇怪 使用 p
  • OpenCV Python cv2.mixChannels()

    我试图将其从 C 转换为 Python 但它给出了不同的色调结果 In C Transform it to HSV cvtColor src hsv CV BGR2HSV Use only the Hue value hue create
  • 通过最小元素比较对 5 个元素进行排序

    我必须在 python 中使用元素之间的最小比较次数来建模对 5 个元素的列表进行排序的执行计划 除此之外 复杂性是无关紧要的 结果是一个对的列表 表示在另一时间对列表进行排序所需的比较 我知道有一种算法可以通过 7 次比较 总是在元素之间
  • Flask 会话变量

    我正在用 Flask 编写一个小型网络应用程序 当两个用户 在同一网络下 尝试使用应用程序时 我遇到会话变量问题 这是代码 import os from flask import Flask request render template
  • Android 中 Kotlin 协程的正确使用方式

    我正在尝试使用异步更新适配器内的列表 我可以看到有太多的样板 这是使用 Kotlin 协程的正确方法吗 这个可以进一步优化吗 fun loadListOfMediaInAsync async CommonPool try Long runn
  • 如何从网页中嵌入的 Tableau 图表中抓取工具提示值

    我试图弄清楚是否有一种方法以及如何使用 python 从网页中的 Tableau 嵌入图形中抓取工具提示值 以下是当用户将鼠标悬停在条形上时带有工具提示的图表示例 我从要从中抓取的原始网页中获取了此网址 https covid19 colo
  • 使用 Tkinter 显示 numpy 数组中的图像

    我对 Python 缺乏经验 第一次使用 Tkinter 制作一个 UI 显示我的数字分类程序与 mnist 数据集的结果 当图像来自 numpy 数组而不是我的 PC 上的文件路径时 我有一个关于在 Tkinter 中显示图像的问题 我为
  • Python 函数可以从作用域之外赋予新属性吗?

    我不知道你可以这样做 def tom print tom s locals locals def dick z print z name z name z guest Harry print z guest z guest print di
  • Flask如何获取请求的HTTP_ORIGIN

    我想用我自己设置的 Access Control Allow Origin 标头做出响应 而弄清楚请求中的 HTTP ORIGIN 参数在哪里似乎很混乱 我在用着烧瓶 0 10 1 以及HTTP ORIGIN似乎是这个的特点之一object
  • 在f字符串中转义字符[重复]

    这个问题在这里已经有答案了 我遇到了以下问题f string gt gt gt a hello how to print hello gt gt gt f a a gt gt gt f a File
  • 使用 \r 并打印一些文本后如何清除控制台中的一行?

    对于我当前的项目 有一些代码很慢并且我无法使其更快 为了获得一些关于已完成 必须完成多少的反馈 我创建了一个进度片段 您可以在下面看到 当你看到最后一行时 sys stdout write r100 80 n I use 80覆盖最终剩余的
  • 如何在Python中对类别进行加权随机抽样

    给定一个元组列表 其中每个元组都包含一个概率和一个项目 我想根据其概率对项目进行采样 例如 给出列表 3 a 4 b 3 c 我想在 40 的时间内对 b 进行采样 在 python 中执行此操作的规范方法是什么 我查看了 random 模
  • 向 Altair 图表添加背景实心填充

    I like Altair a lot for making graphs in Python As a tribute I wanted to regenerate the Economist graph s in Mistakes we
  • 如何在seaborn displot中使用hist_kws

    我想在同一图中用不同的颜色绘制直方图和 kde 线 我想为直方图设置绿色 为 kde 线设置蓝色 我设法弄清楚使用 line kws 来更改 kde 线条颜色 但 hist kws 不适用于显示 我尝试过使用 histplot 但我无法为
  • 为字典中的一个键附加多个值[重复]

    这个问题在这里已经有答案了 我是 python 新手 我有每年的年份和值列表 我想要做的是检查字典中是否已存在该年份 如果存在 则将该值附加到特定键的值列表中 例如 我有一个年份列表 并且每年都有一个值 2010 2 2009 4 1989
  • Python 类继承 - 诡异的动作

    我观察到类继承有一个奇怪的效果 对于我正在处理的项目 我正在创建一个类来充当另一个模块的类的包装器 我正在使用第 3 方 aeidon 模块 用于操作字幕文件 但问题可能不太具体 以下是您通常如何使用该模块 project aeidon P
  • 如何使用 Pycharm 安装 tkinter? [复制]

    这个问题在这里已经有答案了 I used sudo apt get install python3 6 tk而且效果很好 如果我在终端中打开 python Tkinter 就可以工作 但我无法将其安装在我的 Pycharm 项目上 pip

随机推荐

  • pty 终端数据包模式 TIOCPKT

    如果我启动一个终端 我如何知道它以什么模式启动 谁决定呢 我可以以数据包模式启动我的终端吗 TIOCPKT 我遇到了这个包模式链接说 Packet mode is enabled by pushing the pckt module on
  • UnicodeDecodeError,无效的连续字节

    为什么以下项目失败 为什么 latin 1 编解码器能够成功 o a test of xe9 char I want this to remain a string as this is what I am receiving v o de
  • 对类中变量的线程安全访问

    在可能有多个线程运行的应用程序中 并且不确定是否在多线程环境下访问这些方法的可能性 但为了安全起见 我做了一个测试类来演示一种情况 一种方法has被编程为线程安全 如果做得正确 也请发表评论 但其余的则不是 在这样的情况下 里面只有一行代码
  • 大 (> 4mb) 文件附件

    使用 Microsoft Graph API 可以将文件附件添加到消息中 如下所述here 但是 由于 REST 请求的总大小限制为 4mb 因此不允许包含非常大的附件 A 可恢复上传会话允许更大的上传 可以通过提供下载链接的参考附件进行引
  • 使用 Android Parse 的多个组合“或”查询

    在 Android 的 Parse 中 是否可以对多个 OR 子查询进行 AND 操作 我一直在尝试组合两个 OR 查询 但没有成功 根据我的研究 Parse 可能不具备这种能力 不过 我还没有找到任何东西来证实 否认这一点 例如 给定一堆
  • 如何使位图透明?

    param bitmap The source bitmap param opacity a value between 0 completely transparent and 255 completely opaque return T
  • Java 8 中方法引用的外部参数

    我希望将外部参数传递给方法引用 String prefix The number is numbers forEach Main printWithPrefix private static void printWithPrefix Int
  • 保护目录免遭直接 URL 访问

    需要一些帮助 我需要保护所有的FOLDERS in a DIRECTORY from direct URL使用权 我可以这样做吗 htaccess如果是 怎么办 或者他们是更安全的方法 这是我的情况 我允许用户上传 pdf 文件 文件发送至
  • MySQL - 组内计数器

    我想根据升序变量为组中的每一行添加一个计数器 我有一个解决方案 但如果组内的某些变量相等 它就不起作用 CREATE TABLE tb g CHAR 1 x INTEGER INSERT INTO tb g x VALUES a 1 a 2
  • 如何使用JNA回调

    我正在使用 JNA 调用 dll 文件的函数 简单DLL h typedef int stdcall eventCallback unsigned int id int value namespace test class hotas pu
  • 在 ARM 资源组中运行的 Webrole 云服务

    到目前为止 我对使用 WebRoles 或 Worker Roles 进行 PaaS 部署的理解 旋转Web角色或工作者角色将创建云服务来管理它 然而 在ARM资源组中 他们没有云服务的概念 那么在ARM资源组中如何管理Web和Worker
  • 如何更改 Microsoft.AspNet.Identity.EntityFramework.IdentityUser 中的 id 类型

    ASP NET MVC 5 EF6 VS2013 我正在想办法将 Id 字段的类型从 string 更改为 int在类型中 Microsoft AspNet Identity EntityFramework IdentityUser 以便让
  • 决策树分类器抛出 KeyError: 'log_loss'

    我使用sklearn的决策树 通常有log loss classifier DecisionTreeClassifier random state 42 class weight balanced criterion log loss cl
  • NSLog 显示前一个日期

    我想检索两个日期之间添加的核心数据中的所有条目 我正在使用NSPredicate 由于我没有得到正确的结果 我尝试记录日期 它显示了以前的日期 在谷歌搜索一段时间后 我添加了 dateFormatter setTimeZone NSTime
  • 将 UITabBar 定位在顶部

    我是 iOS 开发的初学者 我的问题是 是否可以将 UITabBar 放置在顶部以及如何放置 我无法将 UITabBar 放置在视图的顶部 是否可以 当然可以 但它违反了人机界面准则 截图 Code TabController h impo
  • 如何更改 ggplot2 中特定几何图形的大小?

    我有一个包含 2 层的 ggplotgeom point and geom line如下所示 gp lt ggplot data mtcars aes x disp y hp geom point size 3 geom line size
  • 如何在 HTML5 的画布中复制形状?

    我正在尝试使用 HTML5 构建一个半复杂且水平对称的形状 当我试图完成它时 我意识到如果我可以复制一半形状 镜像它并移动它以将两个图像连接在一起会更容易 我正在寻找如何镜像和移动形状的示例 但没有找到如何复制它的示例 显然 我希望不需要两
  • 加载配置文件时,Selenium 测试需要几分钟才能启动

    我只是想弄清楚是否有其他人看到他们的 Selenium 测试在将配置文件加载到 FirefoxDriver 时运行速度明显变慢 需要 2 分钟以上才能启动 如下所示 Selenium 是 Firefox 的默认配置文件 上述帖子的问题发起者
  • 关于dispatch_queue、重入和死锁的澄清

    我需要澄清如何dispatch queues 与重入和死锁有关 阅读这篇博文iOS OS X 上的线程安全基础知识 我遇到了这句话 所有调度队列都是不可重入的 这意味着如果 您尝试在当前队列上进行dispatch sync 那么 可重入和死
  • Python asyncio:处理可能无限的列表

    我有以下场景 Python 3 6 输入数据是从文件中逐行读取的 协程将数据发送到 API 使用aiohttp 并将调用结果保存到 Mongo 使用motor 所以有很多 IO 发生 代码是用async await 并且对于手动执行的单个调