我有以下代码,可以读取和写入每个id
依次。
async def main():
while id < 1000:
data = await read_async(id)
await data.write_async(f'{id}.csv')
id += 1
read_async()
需要几分钟并且write_async()
运行时间不到一分钟。现在我想要
- Run
read_async(id)
在平行下。但是,由于内存限制,最多可以并行运行 3 个调用。
-
write_async
必须按顺序运行,即write_async(n+1)
之前不能运行write_async(n)
.
您可以使用队列和固定数量的任务来从主任务中读取和写入。主要任务可以使用事件来查找读者是否可以使用新数据,并使用共享字典从读者那里获取新数据。例如(未经测试):
async def reader(q, id_to_data, data_ready):
while True:
id = await q.get()
data = await read_async(id)
id_to_data[id] = data
data_ready.set()
async def main():
q = asyncio.Queue()
for id in range(1000):
await q.put(id)
id_to_data = {}
data_ready = asyncio.Event()
readers = [asyncio.create_task(reader(q, id_to_data, data_ready))
for _ in 3]
for id in range(1000):
while True:
# wait for the current ID to appear before writing
if id in id_to_data:
data = id_to_data.pop(id)
await data.write_async(f'{id}.csv')
break
# move on to the next ID
else:
# wait for new data and try again
await data_ready.wait()
data_ready.clear()
for r in readers:
r.cancel()
使用单独的结果队列而不是事件队列是行不通的,因为队列是无序的。优先级队列可以解决这个问题,但它仍然会立即返回当前可用的最低 id,而编写者需要nextid 以便按顺序处理所有 id。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)