使用 CeleryBeat 执行类似 cron 的任务是一个好主意。
我会尝试捕获你的异常get_url_content
微任务。当你抓住它们时,只需归还其他东西即可。这样,您可以在 summarise_task 中评估(例如计数、列出、检查)它们。
如何使用chunks和链条chunks与另一个任务:
步骤 1:将块转换为组:
如中所述http://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks http://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks, .group()
转换类型的对象celery.canvas.chunks
成组,这是 Celery 中更常见的类型。
第 2 步:链接组和任务
中的“通过组合让你大吃一惊”部分http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives提到:
将一个组与另一个任务链接在一起将自动升级
它是一个和弦
这是包含这两个任务的一些代码以及我通常如何调用它们:
@app.task
def solve_micro_task(arg: str) -> dict:
...
@app.task
def summarize(items: List[List[dict]]):
flat_list = [item for sublist in items for item in sublist]
for report in flat_list:
...
chunk_tasks = solve_micro_task.chunks(<your iterator, e.g., a list>), 10) # type: celery.canvas.chunks
summarize_task = summarize.s()
chain(chunk_tasks.group(), summarize_task)()