我们有一个巨大的 DAG,其中有许多小而快速的任务和一些大而耗时的任务。
我们只想运行 DAG 的一部分,我们发现最简单的方法是不添加我们不想运行的任务。问题是我们的 DAG 有很多相互依赖关系,因此当我们想要跳过某些任务时,不破坏 DAG 就成为了一个真正的挑战。
有没有办法默认为任务添加状态? (对于每次运行),类似:
# get the skip list from a env variable
task_list = models.Variable.get('list_of_tasks_to_skip')
dag.skip(task_list)
or
for task in task_list:
task.status = 'success'
正如评论中提到的,您应该使用BranchPythonOperator
(or ShortCircuitOperator
)以防止执行耗时的任务。如果需要运行这些耗时任务的下游算子,可以使用TriggerRule.ALL_DONE
让这些运算符运行,但请注意,即使上游运算符失败,它也会运行。
您可以使用气流变量来影响这些BranchPythonOperators
无需更新 DAG,例如:
from airflow.models import Variable
def branch_python_operator_callable()
return Variable.get('time_consuming_operator_var')
and use branch_python_operator_callable
作为 BranchPythonOperator 的 Python 可调用对象。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)