Airflow 在成功后多次重新运行单个任务

2023-12-10

按顺序重新运行任务 (A) 3 次的最佳方法是什么?:

即任务A->任务A->任务A->任务B

我之所以这么问,是因为我将运行另一个单独的数据验证任务 (B),该任务将比较这 3 次单独运行的数据。

这就是我到目前为止所做的:

dag = DAG("hello_world_0", description="Starting tutorial", schedule_interval='* * * * *',
          start_date=datetime(2019, 1, 1),
          catchup=False)

data_pull_1 = BashOperator(task_id='attempt_1', bash_command='echo "Hello World - 1!"',dag=dag)
data_pull_2 = BashOperator(task_id='attempt_2', bash_command='echo "Hello World - 2!"',dag=dag)
data_pull_3 = BashOperator(task_id='attempt_3', bash_command='echo "Hello World - 3!"',dag=dag)

data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"',dag=dag)


data_pull_1 >> data_pull_2 >> data_pull_3 >> data_validation

这可能可行,但有更优雅的方法吗?


你可以尝试下面的实现,我们通过使用for循环创建了3个操作

from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    "hello_world_0",
    description="Starting tutorial",
    schedule_interval=None,
    start_date=datetime(2019, 1, 1),
    catchup=False
)

chain_operators = []
max_attempt = 3
for attempt in range(max_attempt):
    data_pull = BashOperator(
        task_id='attempt_{}'.format(attempt),
        bash_command='echo "Hello World - {}!"'.format(attempt),
        dag=dag
    )
    chain_operators.append(data_pull)

data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"', dag=dag)
chain_operators.append(data_validation)

# Add downstream
for i,val in enumerate(chain_operators[:-1]):
    val.set_downstream(chain_operators[i+1])

我将 Schedule_interval 更改为 None,因为'* * * * *'作业将被不断触发

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Airflow 在成功后多次重新运行单个任务 的相关文章

随机推荐

  • 什么时候使用哪种指针?

    好吧 上次我以写 C 为生时 std auto ptr所有的标准库都可用吗 boost shared ptr风靡一时 我从未真正研究过 boost 提供的其他智能指针类型 据我所知 C 11 现在提供了 boost 提出的一些类型 但不是全
  • 如何使用 rvest 从 google 搜索中检索标题下方的文本

    这是这个问题的后续问题 如何使用 rvest 从 google 搜索中检索标题 这次我试图在谷歌搜索中获取标题后面的文本 用红色圈出 由于我缺乏网页设计知识 我不知道如何制定 xpath 来提取标题下面的文本 AllanCameron 的答
  • 用于解析 XML 中模板标签的正则表达式

    我需要解析一些 XML 以提取嵌入的模板标签以进行进一步解析 不过 我似乎无法改变 Python 的正则表达式来完成我想要的事情 英文 当行中的任何位置包含模板标记时 删除该特定行的所有 XML 只保留模板标记在其位置 我整理了一个测试用例
  • 为菜谱 #2 创建 Django 模型

    这是一个后续问题为菜谱创建 Django 模型 我可以为单个食谱选择多种成分 但我的代码只允许选择与所有所选成分相关联的一般数量选择 例如 BLT 食谱 我可以选择培根 生菜和番茄 但我无法为每种食材选择不同的数量 即培根 1 生菜 1 番
  • 如何获取gatsby布局文件中的路径名

    我正在与gasby这里的主文件始终是layout js这是他们所有人的父母 既然它是父文件那么我怎样才能获得位置道具this props location pathname在里面 这是我的布局组件 class Layout extends
  • 如何保证序列化世界中的Java实例控制(没有枚举)?

    在 Java 1 5 之前的世界中 所以没有enum 并且当我的对象被序列化时 我如何强制执行正确的实例控制 我正在谈论这样的类 据我所知 我不确定instance0和instance1将始终是唯一的实例 import java io Se
  • Java 查找数组中最小和第二小的值

    我正在尝试创建两种方法 一种方法用于查找对象数组中的最小值 另一种方法用于查找对象数组中的第二个最小值 我这样写了两个 public static BanffMarathonRunner getFastestTime BanffMarath
  • 找到两个嵌套列表的交集?

    我知道如何获得两个平面列表的交集 b1 1 2 3 4 5 9 11 15 b2 4 5 6 7 8 b3 val for val in b1 if val in b2 or def intersect a b return list se
  • jQuery UI DatePicker 仅显示月份年份

    我正在使用 jQuery 日期选择器在我的应用程序中显示日历 我想知道是否可以用它来显示月份和年份 2010 年 5 月 而不是日历 这是一个 hack 用整个 html 文件更新
  • MySQL 内连接帮助 - 想要包含不匹配的行

    我有几张表要加入查找 PET id owner id pet name size 1 1 Sparky L 2 1 Spot L 3 3 FooFoo M 4 3 Barky L 5 3 Jeb S OWNER id owner name
  • Haskell Posix 中的多行匹配

    我似乎找不到关于 haskell 的 POSIX 实现的像样文档 具体模块Text Regex Posix 谁能指出我在字符串上使用多行匹配的正确方向 好奇者的一个片段 gt extractToken body body
  • json 路径表达式在没有数组的情况下不起作用

    我正在尝试实现一个看似简单的 JSON 路径过滤器 但未能使其正常工作 想知道其他对 Json NET 的 JSON 路径实现有更多经验的人是否对后续步骤有想法 这种情况失败了 但我认为应该可行 var jsonText event dat
  • 字符串解析,提取数字和字母

    解析字符串并提取数字和字母的最简单方法是什么 我的字符串可以采用以下格式 数字 字母或字母 数字 即 10A B5 C10 1G 等 我需要提取两个部分 即 10A gt 10 和 A Update 感谢大家的精彩回答 最简单的方法可能是使
  • 将用户名和密码作为变量传递

    我希望在 feature 文件中使用变量而不是硬编码 UID 和密码 Background def xmlPayload read request xml def UserID UID def Password PWD def Token
  • 在oracle 11g中收缩数据库

    无论如何 我不是数据库管理员 所以我在这里陈述的一些内容可能是错误的 在 SQL Server 中 当我们在数据库中添加大量数据然后删除它时 数据文件 mdf 文件 或数据库 或任何名称 的大小不会减少到原始大小 我们需要缩小它 同样的基本
  • 需要sql查询中的范围计数[重复]

    这个问题在这里已经有答案了 我有问题详情 请访问http sqlfiddle com 3 8e018 1 我有一张成员表 上面有所有学生的分数 我正在尝试计算所有学生的数量 例如 0 9 学生人数 9 10 19 学生人数 0依此类推 直到
  • Hibernate spring 注释会话未关闭/刷新

    我 继承 了一个项目 该项目使用 Spring 注释来管理 Hibernate 的事务 会话 或者至少本来就是这样 目前 Hibernate 会话永远不会刷新 它们设置为 FLUSH MODE NEVER 并且 DAO 需要手动刷新才能将任
  • 自定义 WP 循环的正确 PHP 语法

    我正在尝试在我的 WordPress 网站中插入排序选项 我已经让它工作了 但需要帮助正确地将它与 WordPress 循环一起使用 目前 我有 在页面上 可以选择按字母顺序或时间顺序排序 a href sort date Newest a
  • 使用 Beautiful Soup 创建属性名为“name”的 new_tag

    我有一个 XML 块 需要将一些元素插入其中
  • Airflow 在成功后多次重新运行单个任务

    按顺序重新运行任务 A 3 次的最佳方法是什么 即任务A gt 任务A gt 任务A gt 任务B 我之所以这么问 是因为我将运行另一个单独的数据验证任务 B 该任务将比较这 3 次单独运行的数据 这就是我到目前为止所做的 dag DAG