我想从 Celery 任务返回的列表创建一个组,以便对于任务结果集中的每一项,一个任务将添加到该组中。
这是一个简单的代码示例来解释用例。这???
应该是上一个任务的结果。
@celery.task
def get_list(amount):
# In reality, fetch a list of items from a db
return [i for i in range(amount)]
@celery.task
def process_item(item):
#do stuff
pass
process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))
我可能没有正确处理这个问题,但我很确定从任务内调用任务是不安全的:
@celery.task
def process_list():
for i in get_list.delay().get():
process_item.delay(i)
我不需要秒任务的结果。
您可以使用中间任务来获得这种行为。这是创建类似“地图”的方法的演示,其工作原理如您所建议的那样。
from celery import task, subtask, group
@task
def get_list(amount):
return [i for i in range(amount)]
@task
def process_item(item):
# do stuff
pass
@task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
# runs process_item for each item in the return of get_list
process_list = (get_list.s(10) | dmap.s(process_item.s()))
感谢 Ask Solem,当我就类似问题向他寻求帮助时,他给了我这个建议。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)