我可以通过编程方式确定 Airflow DAG 是计划的还是手动触发的?

2024-03-27

我想创建一个片段,根据 DAG 是计划的还是手动触发的来传递正确的日期。 DAG 每月运行一次。 DAG 根据上个月的数据生成报告(SQL 查询)。

如果我运行预定的 DAG,我可以使用以下 jinja 片段获取上个月的数据:

execution_date.month

鉴于 DAG 安排在上一周期(上个月)结束时,execution_date 将正确返回上个月。但是,在手动运行时,这将返回当前月份(执行日期将是手动触发的日期)。

我想编写一个简单的宏来处理这种情况。但是我找不到一个好的方法来以编程方式查询 DAG 是否以编程方式触发。我能想到的最好的办法就是获取run_id从数据库(通过创建具有数据库会话的宏),检查是否run_id包含这个词manual。有没有更好的方法来解决这个问题?


tl;dr:您可以通过以下方式确定这一点DagRun.external_trigger.


我注意到在树视图中,有一个关于计划运行的轮廓,但不是手动运行。那是因为后者有stroke-opacity: 0;应用在CSS中。

在仓库中搜索这个,我发现了如何气流开发人员检测手动运行 https://github.com/apache/airflow/blob/6ba672eaab84fd71dc8a6f2dd5791651e5a96c38/airflow/www/templates/airflow/tree.html#L338(5 年的老行,所以也应该在旧版本中工作):

.style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})

正在寻找external_trigger带我们到DagRun定义 https://github.com/apache/airflow/blob/dd9f04e152997b7cff56920cb73c1e5b710a6f9d/airflow/models/dagrun.py#L56.

因此,例如,如果您使用的是 Python 回调,则可以有类似这样的内容(可以在 DAG 或单独的文件中定义):

def my_fun(context):
    if context.get('dag_run').external_trigger:
        print('manual run')
    else:
        print('scheduled run')

并在你的Operator设置参数如下:

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    on_failure_callback=my_fun,
    dag=dag,
)

我已经测试过类似的东西并且它有效。

我想你也可以做类似的事情if {{ dag_run.external_trigger }}:- 但我还没有测试过这个,我相信它只适用于该 DAG 的文件。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

我可以通过编程方式确定 Airflow DAG 是计划的还是手动触发的? 的相关文章

随机推荐