运行时添加到DAG的任务无法调度

2023-12-04

我的想法是有一个任务foo它生成输入列表(用户、报告、日志文件等),并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。

So, ideally, my DAG should look something like this: enter image description here

这里唯一的变量是生成的任务数量。我想在所有这些完成后执行更多任务,因此为每个任务创建一个新的 DAG 似乎并不合适。

这是我的代码:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1)
}

dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)

foo_operator = BashOperator(
    task_id='foo',
    bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
    xcom_push=True,
    dag=dag)

def gen_nodes(**kwargs):
    ti = kwargs['ti']
    workers = json.loads(ti.xcom_pull(task_ids='foo'))

    for wid in workers:
        print("Iterating worker %s" % wid)
        op = PythonOperator(
            task_id='test_op_%s' % wid,
            python_callable=lambda: print("Dynamic task!"),
            dag=dag
        )

        op.set_downstream(bar_operator)
        op.set_upstream(dummy_op)

gen_subdag_node_op = PythonOperator(
    task_id='gen_subdag_nodes',
    python_callable=gen_nodes,
    provide_context=True,
    dag=dag
)

gen_subdag_node_op.set_upstream(foo_operator)

dummy_op = DummyOperator(
    task_id='dummy',
    dag=dag
)

dummy_op.set_upstream(gen_subdag_node_op)

bar_operator = DummyOperator(
    task_id='bar',
    dag=dag)

bar_operator.set_upstream(dummy_op)

在日志中,我可以看到gen_nodes被正确执行(即Iterating worker 5, ETC)。然而,新任务尚未安排,也没有证据表明它们已执行。

我在网上找到了相关的代码示例,比如这个,但无法使其发挥作用。我错过了什么吗?

或者,是否有更合适的方法来解决这个问题(隔离工作单元)?


目前,airflow 不支持在 dag 运行时添加/删除任务。

工作流程顺序将是 dag 运行开始时评估的任何顺序。

请参阅此处的第二段。

这意味着您无法根据运行中发生的情况添加/删除任务。您可以根据与运行无关的内容在 for 循环中添加 X 任务,但运行开始后不会更改工作流形状/顺序。

很多时候你可以使用BranchPythonOperator在 dag run 期间做出决定,(这些决定可以基于你的xcom值),但它们必须是沿着工作流中已存在的分支进行的决定。

Dag 运行,Dag 定义在气流中以不完全直观的方式分离,但或多或​​少在 dag 运行中创建/生成的任何内容(xcom, dag_run.conf等)不可用于定义 dag 本身。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

运行时添加到DAG的任务无法调度 的相关文章

随机推荐

  • dd-mm-yyyy hh:mm 的正则表达式

    我想查看日期dd mm yyyy hh mm格式 现在我正在使用 0 1 9 1 d 2 d 3 01 0 1 9 1 d 2 d 3 01 19 20 d 2 匹配日期的正则表达式dd mm yyyy format 但我想检查一下dd m
  • 桌面快捷方式的位置存储在哪里?

    Windows桌面快捷方式的位置存储在哪里 我问的是屏幕职位图标的一部分 而不是实际的图标本身 我知道图标本身存储在各种 DLL EXE 等中 这些位置显然存储在某些非易失性存储中 因为它们在重新启动时仍然存在 我的最终目标是编写一个应用程
  • 阻止滚动函数多次触发?

    所以我有一个脚本 可以从数据库中提取数据并在用户接近页面底部时显示它 问题 当用户到达底部时 脚本应该只返回一个帖子 但实际上 正在发出多个请求 导致所有帖子都被快速从数据库中提取 这反过来又以错误的方式返回它们命令 我的问题是 有没有人知
  • 使用 open() 或 creat() 创建文件的权限位设置比我要求的要少

    我正在编写一个程序来模仿 cp 实用程序 但是 我无法获得文件权限以正常工作 我知道它们存储在结构中stat并存储在st mode场与stat 我的问题是我没有获得组或其他类别的写入权限 即我得到 rwxr xr x作为文件的权限 即使源文
  • Java 8 流的 toArray 和 size 参数

    我想知道stream toArray x gt new Integer x 如何知道数组的大小 我写了一个片段 其中创建了一个大小为 4 的整数列表并过滤了值 它创建了一个过滤流长度的数组 我在流上看不到任何方法来获取流的大小 List
  • 使用 Aforge.Net 将灰度图像转换为黑白图像

    我对 Aforge Net 很陌生 我正在寻找一种将灰度图像转换为黑白图像的方法 我真的找不到任何支持吗 我设法通过应用灰度滤镜将普通图像转换为灰度图像 但我找不到任何有关黑白转换的信息 有人可以帮我一下吗 使用阈值类别将图像转换为黑白图像
  • Java applet 在 Web 上的可用性

    对于我们大学的电子学习项目 我们使用 Java 小程序在 Ilias 学习单元内显示一些交互式内容 例如一些交互式函数绘图或一些简单的问答系统或其他交互式元素以及这些元素的任意组合 我们决定使用 Java 因为我们想要一个开放的解决方案 当
  • Git:使一个分支与另一个分支完全相同

    我对 Git 比较陌生 而且仍然不太适应 现在 我正在寻找可以使当前分支看起来像另一个分支的命令 选项 魔法 也就是说 合并它们 但当出现冲突时 始终选择要合并到当前分支中的差异 我的情况是这样的 我在 master 分支上有一个稳定的应用
  • d3js 动态 csv 从下拉列表切换

    我是 d3js 的新手 我在这里成功使用了示例 https gist github com d3noob 4414436 这非常好 因为它可以远程工作 CSV 存储在服务器端 现在 我想更进一步 添加从列表中选择源 CSV 并重新绘制图表的
  • # 和 ## 运算符在 C 中做什么? [复制]

    这个问题在这里已经有答案了 类似 NAME 或 NAME 它们在C中是什么意思 我在关于宏的 GCC 文档中看到了它们 运算符 连接两个参数 它们之间不留空格 define printe a b a b printe c out lt lt
  • 动态导入 JavaScript

    请问将 JavaScript js 文件动态导入父 JavaScript 代码的正确方法是什么 我正在使用以下代码 但它似乎不正确 function loadjscssfile filename filetype if filename i
  • 类型错误:“图像”对象不可使用 PIL 进行下标

    我正在尝试在 google colab 上开发一个网络应用程序 我想在这个 Web 应用程序中使用我之前训练过的模型制作一个图像分类器 当我在 Web 应用程序中从浏览器中选择要分类的图像时 出现以下错误 TypeError Image o
  • 从 NSMutable 字典中分离键和对象,并使用 sqlite 的插入命令中的值

    大家好 我正在 iPhone 中开发一个 sqlite 应用程序 因为我是这个应用程序的新手 所以我不知道如何在 sqlite 的插入语句命令中使用 NSMutableDictionary 中的键和对象 例如 我想要以下格式的插入语句 插入
  • 直接在javascript函数中调用C#方法

    如何在 javascript 函数中直接调用 C 方法 例如page load页面后面代码的方法 请帮我 要在客户端事件上调用服务器端方法 您需要执行以下操作 1 创建服务器端方法 void DoSomething 2 实施System W
  • 如何从 Perl 脚本运行“source”命令(Linux)?

    我在尝试着source来自 Perl 脚本 script pl 的脚本 system source some generic script 请注意 这个通用脚本可以是 shell python 或任何其他脚本 另外 我无法将这个通用脚本中存
  • ElasticSearch 5.x 上下文建议器 NEST .Net

    我正在尝试使用 ElasticSearch 5 1 2 上的 Nest 5 0 创建一个带有上下文建议器的索引 目前 我可以创建映射 elasticClient MapAsync
  • 在 Java 8 中查找列表的最大值、最小值、总和和平均值

    如何在Java 8中找到以下列表中数字的最大值 最小值 总和和平均值 List
  • 本地主机上的 Angular2 CORS 问题[重复]

    这个问题在这里已经有答案了 Failed to load URL Response to preflight request doesn t pass access control check No Access Control Allow
  • 当许多客户端连接时,我的 socket.io 服务器开始随机断开客户端连接(由于“ping 超时”原因)

    我正在构建一个网站 我的客户端通过网络套接字与服务器进行通信 我在后端使用 Nodejs 因此使用著名的 socket io 库进行 Web 套接字通信 问题 1 到 40 个客户端一切正常 之后服务器开始随机断开客户端连接 一开始我认为这
  • 运行时添加到DAG的任务无法调度

    我的想法是有一个任务foo它生成输入列表 用户 报告 日志文件等 并为输入列表中的每个元素启动一个任务 目标是利用 Airflow 的重试和其他逻辑 而不是重新实现它 So ideally my DAG should look someth