我构建了两个 DAG(dag_a、dag_b),并在 dag_b 中创建了一个刺探 dag_a 的ExternalTaskSensor。这些 DAG 有两个用例:
- 同时调度dag_a和dag_b,并使用依赖关系先处理dag_a,然后处理dag_b
- 手动独立触发dag_b,而不关心dag_a。
使用ExternalTaskSensor,用例1 效果很好。但用例 2 不起作用。并且 dag_b 将停止在 dag_a 的ExternalTaskSensor 处并永远戳戳。有没有办法在某些条件下跳过ExternalTaskSensor并独立运行dag_b?
我能够通过使用来完成此任务execution_date_fn
检查 DAG 运行是否是手动触发的,并引发AirflowSkipException
如果是这样。当手动触发 DAG 时,传感器任务将被跳过。
在 Airflow 2.1 上测试。
from datetime import datetime
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import get_current_context
from airflow.sensors.external_task import ExternalTaskSensor
def get_execution_date(execution_date):
context = get_current_context()
if context["dag_run"].external_trigger:
raise AirflowSkipException
return execution_date
default_dag_args = {
"schedule_interval": "* * * * *",
"start_date": datetime(2022, 1, 1),
"catchup": False,
}
with DAG(dag_id="dag_a", **default_dag_args) as dag_a:
dummy_task = DummyOperator(task_id="dummy_task")
with DAG(dag_id="dag_b", **default_dag_args) as dag_b:
await_dag_a = ExternalTaskSensor(
task_id="await_dag_a",
external_dag_id="dag_a.v1",
execution_date_fn=get_execution_date,
)
dummy_task = DummyOperator(
task_id="dummy_task",
# enable task to trigger even if parent task is skipped
trigger_rule="none_failed",
)
await_dag_a >> dummy_task
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)