如何使用ExternalTask​​Sensor触发Airflow DAG独立运行

2024-02-08

我构建了两个 DAG(dag_a、dag_b),并在 dag_b 中创建了一个刺探 dag_a 的ExternalTask​​Sensor。这些 DAG 有两个用例:

  1. 同时调度dag_a和dag_b,并使用依赖关系先处理dag_a,然后处理dag_b
  2. 手动独立触发dag_b,而不关心dag_a。

使用ExternalTask​​Sensor,用例1 效果很好。但用例 2 不起作用。并且 dag_b 将停止在 dag_a 的ExternalTask​​Sensor 处并永远戳戳。有没有办法在某些条件下跳过ExternalTask​​Sensor并独立运行dag_b?


我能够通过使用来完成此任务execution_date_fn检查 DAG 运行是否是手动触发的,并引发AirflowSkipException如果是这样。当手动触发 DAG 时,传感器任务将被跳过。

在 Airflow 2.1 上测试。

from datetime import datetime

from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import get_current_context
from airflow.sensors.external_task import ExternalTaskSensor


def get_execution_date(execution_date):
    context = get_current_context()
    if context["dag_run"].external_trigger:
        raise AirflowSkipException
    
    return execution_date


default_dag_args = {
    "schedule_interval": "* * * * *",
    "start_date": datetime(2022, 1, 1),
    "catchup": False,
}

with DAG(dag_id="dag_a", **default_dag_args) as dag_a:
    dummy_task = DummyOperator(task_id="dummy_task")

with DAG(dag_id="dag_b", **default_dag_args) as dag_b:
    await_dag_a = ExternalTaskSensor(
        task_id="await_dag_a",
        external_dag_id="dag_a.v1",
        execution_date_fn=get_execution_date,
    )
    
    dummy_task = DummyOperator(
        task_id="dummy_task",
        # enable task to trigger even if parent task is skipped
        trigger_rule="none_failed",
    )

    await_dag_a >> dummy_task
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用ExternalTask​​Sensor触发Airflow DAG独立运行 的相关文章

随机推荐

  • 向 MQ 发送消息时如何删除默认的 Spring JMS 模板标头?

    使用 Java Spring 与 WebSphere MQ 交互并尝试向其发送消息 Spring 不断向其添加以下标头信息 RFH MQSTR
  • 如何更改 SQL Server 的默认语言?

    现在当我查询时 SELECT language 它得到 us english 但我需要俄语 我不能使用SET LANGUAGE russian对于每个查询 我需要默认设置它 对于所有新会话 使用 SQL Server Management
  • Java - 从 JFileChooser 中删除组件(文件类型)

    如何从 JFileChooser 中删除组件 文件类型 标签及其组合框 我有以下代码 JFileChooser fileChooser new JFileChooser fileChooser setFileSelectionMode JF
  • IF 语句中 OR 和 AND 运算符的中断条件

    If 语句和任何其他布尔比较足够智能 可以在评估时在第一个 FALSE 值处停止A and B and C and D评估时首先为 TRUE 值A or B or C or D 这种行为的名称是什么 这是编译器优化吗 如果是这样 有没有办法
  • 如何强制完全下载链接上的txt文件?

    我有一个简单的文本文件 我想在任何锚标记链接上下载该文件 但是当我点击该链接时 txt 文件显示给我但未下载 我已经尝试过这段代码 a href test txt Click here a 单击链接时下载文件 而不是导航到文件 a href
  • 路由错误未初始化常量用户

    我是 Rails 新手 我正在尝试为演示应用程序设置 使用 facebook 登录 我正在使用 OmniAuth 并遵循本教程 https github com plataformatec devise wiki OmniAuth 概述 h
  • 多类模型的准确率、精确度和召回率

    我该如何计算accuracy 精确 and recall对于混淆矩阵中的每个类 我正在使用嵌入式数据集 iris 混淆矩阵如下 prediction setosa versicolor virginica setosa 29 0 0 ver
  • 打印 Windows 窗体

    我继承了一些代码来打印表单的内容 但是在纸上生成的图像似乎有某种阴影 模糊 就好像它试图进行抗锯齿但做得不太好 并且字母在边缘像素化 有谁知道提高最终质量的方法吗 System Drawing Printing PrintDocument
  • Visual Studio 扩展中的 app.config?

    我创建了一个代表 Visual Studio 项目向导 vsix 包 的 Visual Studio 扩展 我正在尝试连接 log4net 但没有成功 我已将问题归结为 app config 未正确加载 我已将其添加到我的 Visual S
  • 棘手的数组初始化

    在 C 不是 C 中 我尝试创建两个包含相同值的字符串表 但以两种不同的方式对值进行排序 而且我不希望字符串在内存中重复 基本上 我想做以下事情 除了根据 gcc 之外 它会失败 因为第二个数组初始化中的 初始化器元素不是常量 有办法解决这
  • 在 WCF/.NET 中返回数据表

    我有一个 WCF 服务 我想从中返回一个数据表 我知道 就返回 DataTable 是否是一个好的实践而言 这通常是一个备受争议的话题 让我们暂时把它放在一边 当我从头开始创建数据表时 如下所示 没有任何问题 该表已创建 填充并返回给客户端
  • 删除工作表(如果存在)并创建一个新工作表

    我想浏览我的 Excel 工作表并找到具有特定名称的工作表 如果找到则删除该工作表 之后 我想在所有具有该名称的现有工作表之后创建一个工作表 我的代码如下 For Each ws In Worksheets If ws Name asdf
  • Scapy生成STP(生成树协议)数据包

    我正在尝试生成STP数据包并使用wireshark捕获它 基本上我所做的是 gt gt gt 从Scapy发送 STP wireshark的结果是 53918 2671 938356000 00 00 00 00 00 00 FC 49 格
  • 启用 NoResize 模式时最小化窗口

    我有一个 WPF 应用程序 其中Window财产ResizeMode设置为NoResize 因此最大化和最小化按钮被隐藏 有没有办法添加最小化按钮 因为我不想允许用户仅调整窗口大小以避免表单上的控件变形 但最小化窗口是一个有用的功能 Set
  • 如何让按钮在div中居中?

    我有一个宽度为 100 的 div 我想在其中放置一个按钮 我该怎么做 div style width 100 height 100 border 1px solid div
  • 隐藏标题页上的幻灯片编号

    我在用 Reveal initialize slideNumber true 是否有可能隐藏标题页上的幻灯片编号 我是这样做的 Reveal addEventListener slidechanged event gt const isFi
  • 标签内的图像
  • 我有一个关于内部浮动元素的问题 li tag 我有以下标记 li li img src concept truck jpg alt 2013 Toyota Tacoma p 2013 Toyota Tacoma p p Price 4500
  • 无法获取 Flask 应用程序中设置的环境变量

    我尝试在 CentOS 中将敏感信息设置为环境变量 并将它们传递给主文件中使用的 Flask 配置文件 即init py 但这没有用 Flask 应用程序在 Apache 下运行 我首先以 root 用户身份编辑 etc environme
  • 使用 LLVM C API 生成对内部函数的调用

    我正在编写一些使用 LLVM C API 的代码 如何使用内在函数 例如llvm cos f64 or llvm sadd with overflow i32 每当我尝试通过生成一个全局来做到这一点LLVMAddGlobal 具有正确的类型
  • 如何使用ExternalTask​​Sensor触发Airflow DAG独立运行

    我构建了两个 DAG dag a dag b 并在 dag b 中创建了一个刺探 dag a 的ExternalTask Sensor 这些 DAG 有两个用例 同时调度dag a和dag b 并使用依赖关系先处理dag a 然后处理dag