起初与dag callback
(on_failure_callback
and on_success_callback
),我以为这会触发success
or fail
状态时dag
完成(如 dag 中所定义)。
但随后它似乎每次都会被实例化task instance
并不是dag run
,所以如果一个DAG有N个任务,它就会触发这些回调N次。
我正在尝试捕获任务 ID 并将其发送到 Slack。读另一本相关问题 https://stackoverflow.com/questions/45312439/airflow-default-on-failure-callback我想出了以下内容:
def success_msg(context):
slack.slack_message(context['task_instance']); #send task-id to slack
def failure_msg(context):
slack.slack_message(context['task_instance']); #send task-id to slack
default_args = {
[...]
'on_failure_callback': failure_msg,
'on_success_callback': success_msg,
[...]
}
但它失败了,我应该如何解析上下文变量并允许获取任务ID?
您可以从上下文中使用任务对象访问任务。
context['task']
应该是执行此操作的适当方法。要获取任务名称,请使用task_id
:
context['task'].task_id
要查找上下文中可用的更多对象,您可以浏览此处的列表:https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)