Airflow - 从 dag 上下文回调中解析任务 ID

2024-01-11

起初与dag callback (on_failure_callback and on_success_callback),我以为这会触发success or fail状态时dag完成(如 dag 中所定义)。 但随后它似乎每次都会被实例化task instance并不是dag run,所以如果一个DAG有N个任务,它就会触发这些回调N次。

我正在尝试捕获任务 ID 并将其发送到 Slack。读另一本相关问题 https://stackoverflow.com/questions/45312439/airflow-default-on-failure-callback我想出了以下内容:

def success_msg(context):
    slack.slack_message(context['task_instance']); #send task-id to slack

def failure_msg(context):
    slack.slack_message(context['task_instance']); #send task-id to slack

default_args = {
    [...]
    'on_failure_callback': failure_msg,
    'on_success_callback': success_msg,
    [...]
}

但它失败了,我应该如何解析上下文变量并允许获取任务ID?


您可以从上下文中使用任务对象访问任务。

context['task']应该是执行此操作的适当方法。要获取任务名称,请使用task_id:

context['task'].task_id

要查找上下文中可用的更多对象,您可以浏览此处的列表:https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Airflow - 从 dag 上下文回调中解析任务 ID 的相关文章

随机推荐