我有以下场景:
- Python 3.6+
- 输入数据是从文件中逐行读取的。
- 协程将数据发送到 API(使用
aiohttp
)并将调用结果保存到 Mongo (使用motor
)。所以有很多 IO 发生。
代码是用async
/ await
,并且对于手动执行的单个调用来说效果很好。
我不知道该怎么做是批量使用输入数据。
All asyncio
我见过的例子表明asyncio.wait
通过发送有限列表作为参数。但我不能简单地向其发送任务列表,因为输入文件可能有数百万行。
我的场景是通过传送带将数据流向消费者。
我还可以做些什么?我希望程序使用它可以聚集的所有资源来处理文件中的数据,但又不会被淹没。
我的场景是通过传送带将数据流向消费者。我还可以做些什么?
您可以创建固定数量的任务,大致相当于传送带的容量,然后将它们从传送带上弹出queue。例如:
async def consumer(queue):
while True:
line = await queue.get()
# connect to API, Mongo, etc.
...
queue.task_done()
async def producer():
N_TASKS = 10
loop = asyncio.get_event_loop()
queue = asyncio.Queue(N_TASKS)
tasks = [loop.create_task(consume(queue)) for _ in range(N_TASKS)]
try:
with open('input') as f:
for line in f:
await queue.put(line)
await queue.join()
finally:
for t in tasks:
t.cancel()
由于与线程不同,任务是轻量级的并且不会占用操作系统资源,因此创建“太多”任务也是可以的。 asyncio 可以顺利处理数千个任务,尽管对于这个任务来说这可能有点过分了——几十个就足够了。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)