气流,在 dag 运行之前标记任务成功或跳过它

2024-04-25

我们有一个巨大的 DAG,其中有许多小而快速的任务和一些大而耗时的任务。

我们只想运行 DAG 的一部分,我们发现最简单的方法是不添加我们不想运行的任务。问题是我们的 DAG 有很多相互依赖关系,因此当我们想要跳过某些任务时,不破坏 DAG 就成为了一个真正的挑战。

有没有办法默认为任务添加状态? (对于每次运行),类似:

# get the skip list from a env variable    
task_list = models.Variable.get('list_of_tasks_to_skip')

dag.skip(task_list)

or

for task in task_list:
    task.status = 'success'

正如评论中提到的,您应该使用BranchPythonOperator (or ShortCircuitOperator)以防止执行耗时的任务。如果需要运行这些耗时任务的下游算子,可以使用TriggerRule.ALL_DONE让这些运算符运行,但请注意,即使上游运算符失败,它也会运行。

您可以使用气流变量来影响这些BranchPythonOperators无需更新 DAG,例如:

from airflow.models import Variable

def branch_python_operator_callable()
  return Variable.get('time_consuming_operator_var')

and use branch_python_operator_callable作为 BranchPythonOperator 的 Python 可调用对象。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

气流,在 dag 运行之前标记任务成功或跳过它 的相关文章

  • 将 pandas 数据框中的列减去其第一个值

    我需要将 pandas 数据帧的一列中的所有元素减去其第一个值 在这段代码中 pandas 抱怨 self inferred type 我猜这是循环引用 df Time df Time df Time 0 在这段代码中 pandas 抱怨为
  • python 可以检测它运行在哪个操作系统下吗?

    python 可以检测操作系统 然后为文件系统构建 if else 语句吗 我需要将 Fn 字符串中的 C CobaltRCX 替换为 FileSys 字符串 import os path csv from time import strf
  • 打印 scrapy 请求的“响应”

    我正在尝试学习 scrapy 在遵循教程的同时 我正在尝试进行细微的调整 我想简单地从请求中获取响应内容 然后我会将响应传递到教程代码中 但我无法发出请求并获取响应内容 建议就好 from scrapy http import Respon
  • 如何在 Ubuntu 上安装 Python 模块

    我刚刚用Python写了一个函数 然后 我想将其做成模块并安装在我的 Ubuntu 11 04 上 这就是我所做的 创建 setup py 和 function py 文件 使用 Python2 7 setup py sdist 构建分发文
  • Python 中的六边形自组织映射

    我在寻找六边形 自组织映射 http en wikipedia org wiki Self organizing map在Python上 准备好模块 如果存在的话 绘制六边形单元格的方法 将六边形单元作为数组或其他方式使用的算法 About
  • 使用 Django 的 post_save() 信号

    我有两张桌子 class Advertisement models Model created at models DateTimeField auto now add True author email models EmailField
  • 如何自动替换多个文件的文本内容中的字符?

    我有一个文件夹 myfolder包含许多乳胶表 我需要替换其中每个字符 即替换任何minus sign by an en dash 只是为了确定 我们正在替换连字符INSIDE该文件夹中的所有 tex 文件 我不关心 tex 文件名 手动执
  • 将 subprocess.Popen 的输出通过管道传输到文件

    我需要启动一些长时间运行的进程subprocess Popen 并希望拥有stdout and stderr从每个自动管道到单独的日志文件 每个进程将同时运行几分钟 我想要两个日志文件 stdout and stderr 每个进程当进程运行
  • 唯一的图像哈希值即使 EXIF 信息更新也不会改变

    我正在寻找一种方法来为 python 和 php 中的图像创建唯一的哈希值 我考虑过对原始文件使用 md5 和 因为它们可以快速生成 但是当我更新 EXIF 信息 有时时区关闭 时 它会更改总和 并且哈希也会更改 有没有其他方法可以为这些文
  • 反加入熊猫

    我有两个表 我想附加它们 以便仅保留表 A 中的所有数据 并且仅在其键唯一时添加表 B 中的数据 键值在表 A 和 B 中是唯一的 但在某些情况下键将出现在表 A 和 B 中 我认为执行此操作的方法将涉及某种过滤联接 反联接 以获取表 B
  • Python 中的这种赋值方式叫什么? a = b = 真

    我知道关于元组拆包 http docs python org tutorial datastructures html tuples and sequences但是当一行中有多个等号时 这个赋值被称为什么 阿拉a b True 它总是让我有
  • Python While 循环,and (&) 运算符不起作用

    我正在努力寻找最大公因数 我写了一个糟糕的 运算密集型 算法 它将较低的值减一 使用 检查它是否均匀地划分了分子和分母 如果是 则退出程序 但是 我的 while 循环没有使用 and 运算符 因此一旦分子可整除 它就会停止 即使它不是正确
  • Python unicode 字符代码?

    有没有办法将 Unicode 字符 插入 Python 3 中的字符串 例如 gt gt gt import unicode gt gt gt string This is a full block s unicode charcode U
  • 我可以使用 dask 创建 multivariate_normal 矩阵吗?

    有点相关这个帖子 https stackoverflow com questions 52337612 random multivariate normal on a dask array 我正在尝试复制multivariate norma
  • FastText - 由于 C++ 扩展未能分配内存,无法加载 model.bin

    我正在尝试使用 FastText Python APIhttps pypi python org pypi fasttext https pypi python org pypi fasttext虽然 据我所知 此 API 无法加载较新的
  • 使用 lambda 函数更改属性值

    我可以使用 lambda 函数循环遍历类对象列表并更改属性值 对于所有对象或满足特定条件的对象 吗 class Student object def init self name age self name name self age ag
  • 如何在单独的文件中使用 FastAPI Depends 作为端点/路由?

    我在单独的文件中定义了一个 Websocket 端点 例如 from starlette endpoints import WebSocketEndpoint from connection service import Connectio
  • 将 Scikit-Learn OneHotEncoder 与 Pandas DataFrame 结合使用

    我正在尝试使用 Scikit Learn 的 OneHotEncoder 将 Pandas DataFrame 中包含字符串的列替换为 one hot 编码的等效项 我的下面的代码不起作用 from sklearn preprocessin
  • 从时间序列生成日期特征

    我有一个数据框 其中包含如下列 Date temp data holiday day 01 01 2000 10000 0 1 02 01 2000 0 1 2 03 01 2000 2000 0 3 30 01 2000 200 0 30
  • 如何识别图形线条

    我有以下格式的路径的 x y 数据 示例仅用于说明 seq p1 p2 0 20 2 3 1 20 2 4 2 20 4 4 3 22 5 5 4 22 5 6 5 23 6 2 6 23 6 3 7 23 6 4 每条路径都有多个点 它们

随机推荐

  • 布隆过滤器在cassandra中的作用是什么?

    从 Cassandra 文档的两个不同链接中 我发现 link 1 http docs datastax com en cassandra 3 0 cassandra dml dmlHowDataWritten html 存储在内存中的结构
  • 隐藏包中未记录的函数 - 使用 .function_name?

    我需要在包中提供一些功能 但我不想导出它们或为它们编写文档 我只是将它们隐藏在另一个函数中 但它们需要可供多个函数使用 因此这样做会成为范围界定和维护问题 这样做的正确方法是什么 我的意思是他们是否需要特殊的名字 他们是否会去其他地方 R子
  • 使用 Castle Fluent 接口注册拦截器

    我正在尝试实施通过拦截器 无法弄清楚如何通过流畅的机制注册接口 我看到一个 Component For
  • R/Javascript:崩溃和扩展的网络

    我正在使用 R 编程语言 我有以下图形网络数据 library igraph library visNetwork from lt c Boss TeamA TeamA TeamA SubteamA1 SubteamA1 SubteamA1
  • Trie 节省了空间,但是如何节省空间呢?

    我对 Trie 实现如何节省空间并以最紧凑的形式存储数据感到困惑 如果你看下面的树 当您在任何节点存储字符时 您还需要存储对该字符的引用 因此对于字符串的每个字符 您需要存储其引用 好吧 当常见字符到达时 我们节省了一些空间 但在存储对该字
  • 使用 Cognito 登录 Facebook 时重定向到 URL,但出现错误

    我创建了一个用户投票并将 Facebook 连接到它 这是 AWS 控制台中的外观 我也设置了email作为注册的必需属性 However when I visit my hosted login page and click Contin
  • Gatsby v2 网站无法正确加载 CSS

    在我的开发环境中 该网站看起来符合预期 但是当我运行 gatsby build 时 我的 CSS 无法正确显示 如果我手动导航到另一个页面 则 CSS 按预期显示 没有错误 但我确实收到此警告 资源http localhost 9000 s
  • 播放来自 BLE 的原始音频数据流

    我正在开发一个 iOS 应用程序 我正在接收来自的原始数据流BLE 我在用着AVAudioEngine带缓冲器 let format AVAudioFormat commonFormat AVAudioCommonFormat pcmFor
  • TimeStream + Grafana:无法识别数据中的序列

    在 AWS Timestream 上跳跃 我在 grafana 集成方面遇到了一些问题 我构建了一个查询 返回按天和 事物 分组的事件计数 并希望在图表中显示该结果 甚至哪一个都不重要 In a table the data is disp
  • Java中子进程的重定向I/O(为什么ProcessBuilder.inheritIO()不起作用?)

    我正在按以下方式启动一个流程 try final Process mvnProcess new ProcessBuilder cmd c mvn version directory new File System getProperty u
  • 使用带有指向字符的指针的 scanf 函数

    我写了下面的代码 int main char arrays 12 char pointers scanf s arrays scanf s pointers printf s arrays printf s pointers return
  • 将 KQL 查询使用的所有表名放入 C# 中的列表中

    假设我有一个 KQL 查询 它使用多个表来检索数据 我需要用 C 编写一些代码 它将获取给定 KQL 查询使用的所有表 并将所有这些表名称放入列表中 简而言之 我需要分析每个 KQL 查询以了解它从哪些表获取数据 我已经尝试通过编写以下代码
  • 安装新的 Magento 扩展需要注销/登录,否则您会在管理页面中收到 404

    两个不同的人告诉我 以下是 Magento 的一个已知问题 安装新扩展时 管理员尝试访问 配置扩展程序 并获取 404 页面 去的方法 解决此问题的方法是注销然后登录到他的管理面板 在设计扩展时有没有办法解决这个问题 这方面有一个悬而未决的
  • 使用正则表达式验证 mysql 语句

    我正在用java编写一个程序 在对话框中用户需要输入MySQL SELECT 语句 程序必须验证该语句并继续运行 我的问题是 有没有办法以及如何使用正则表达式验证语句 我需要 仅 正则表达式模式 谢谢 好吧 也许是为了扩展正则表达式 但是对
  • postgresql 中一个非常大的表的分页和过滤(键集分页?)

    我有一个科学数据库 目前有 4 300 000 条记录 它是一个科学数据库 有一个 API 为其提供数据 到 2020 年 6 月 我可能会有大约 100 000 000 条记录 这是表 输出 的布局 ID sensor ID speed
  • Android:可点击的图像视图小部件

    我想做一个非常简单的小部件 它必须仅由一个图像视图组成 1 收到短信时 它应该改变图像 2 点击它也应该改变图像 我尝试使用 ImageButton 进行制作 但失败了 在收到短信的事件上更改图像时出现问题 新图像的比例错误 无论如何 现在
  • mysema 的 Maven apt-get-plugin

    我在 pom xml 中添加了以下代码片段 但在 Eclipse 中执行部分出现错误 Plugin execution not covered by lifecycle configuration com mysema maven mave
  • 气流:找不到 dag_id

    我在不同的 AWS 机器上运行气流服务器和工作线程 我已经在它们之间同步了 dags 文件夹 然后运行airflow initdb在两者上 并在运行时检查 dag id 是否相同airflow list tasks
  • XDocument.Descendants(itemName) - 查找限定名称时出现问题

    我正在尝试从网站读取 XML RSS Feed 因此我使用异步下载并创建一个XDocument与XDocument Parse Method 该文档的目的非常简单 如下所示
  • 气流,在 dag 运行之前标记任务成功或跳过它

    我们有一个巨大的 DAG 其中有许多小而快速的任务和一些大而耗时的任务 我们只想运行 DAG 的一部分 我们发现最简单的方法是不添加我们不想运行的任务 问题是我们的 DAG 有很多相互依赖关系 因此当我们想要跳过某些任务时 不破坏 DAG