如何管理气流 dag 之间的 python 包?

2024-02-24

如果我有多个气流 dags 以及一些重叠的 python 包依赖项,我如何保留每个项目 deps。脱钩?例如。如果我在同一台服务器上有项目 A 和 B,我会用类似的东西运行它们。

source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate

基本上,想在相同的情况下运行 dags(例如,每个 dag 使用可能有重叠的包依赖的 python 脚本。我想单独开发(即,当想要更新时不必使用包更新所有代码)该软件包仅适用于一个项目))。请注意,我一直在使用BashOperator运行 python 任务,例如...

do_stuff = BashOperator(
        task_id='my_task',
        bash_command='python /path/to/script.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

有办法让它发挥作用吗?气流是否还有其他最佳实践方法可以帮助人们解决(或避免)此类问题?


根据 apache-airflow 邮件列表的讨论,解决我使用各种 python 脚本执行任务的模块化方式的最简单答案是直接为每个脚本或模块调用 virtualenv python 解释器二进制文件,例如。

source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate

会翻译成类似的东西

do_stuff_a = BashOperator(
        task_id='my_task_a',
        bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)
do_stuff_b = BashOperator(
        task_id='my_task_b',
        bash_command='/path/to/virtualenv_b/bin/python /path/to/script_b.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

在气流中。


关于将参数传递给任务的问题,这取决于您想要传入的参数的性质。在我的例子中,某些参数取决于数据表在 dag 运行当天的样子(例如表中的最高时间戳记录等) .)。为了将这些参数添加到任务中,我有一个在此之前运行的“congif dag”。在配置 dag 中,有一个任务将“真实”dag 的参数生成为 python 字典并转换为 pickle 文件。然后“config” dag 有一个任务,它是TriggerDagRunOperator激活“真实”dag,它具有从“config”dag 生成的 pickle 文件中读取的初始逻辑(在我的例子中,作为Dict)我把它读进去bash_command串状bash_command=f"python script.py {configs['arg1']}".

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何管理气流 dag 之间的 python 包? 的相关文章

随机推荐

  • Spring Boot REST API 的指标收集

    我正在尝试收集我的 Spring Boot 2 1 0 RELEASE 应用程序的指标 具体来说我想知道 调用各个 REST 端点的次数 每个端点处理请求所花费的时间 我的请求被处理 出错的平均速率 执行器 actuator metrics
  • 测量缠绕的绳子

    我正在尝试创建一个控件 它基本上允许我在彼此下面绘制不同的字符串 但是 字符串的宽度不能大于控件的宽度 为了解决这个问题 我正在考虑将 RectangleF 对象传递给 Graphics DrawString 方法 这将包装比传递的矩形宽度
  • array_walk_recursive 与数组?

    我有一个菜单数组 它是一个多维数组 我想对每个项目做一些事情 所以我尝试了 array walk recursive 这是菜单 menu array array name gt a url gt b array name gt c url
  • X 没有实现 Y(...方法有一个指针接收器)

    已经有几个关于此的问答 X 没有实现 Y 方法有一个指针接收器 的事情 但对我来说 他们似乎在谈论不同的事情 并不适用于我的具体情况 因此 我没有将问题变得非常具体 而是将其变得广泛和抽象 似乎有几种不同的情况可能会导致此错误发生 有人可以
  • 从 Chrome 打包应用程序读取和写入本地 sqlite 数据库

    是否可以从 chrome 打包应用程序读取和写入本地 sqlite 文件 我目前已经读取并写入了一个 json 文件 其中包含本地存储在硬盘上的应用程序数据 但我也希望能够使用 sqlite 数据库来执行此操作 我需要它在本地而不是在驱动器
  • 从概念上讲,重玩在游戏中是如何运作的?

    我有点好奇如何在游戏中实现重播 最初 我认为游戏中只会有一个包含每个玩家 人工智能操作的命令列表 然后它会 重新玩 游戏并让引擎照常渲染 然而 我查看了 FPS RTS 游戏的重播 经过仔细检查 甚至像粒子和图形 声音故障之类的东西都是一致
  • 如何编辑嵌入不和谐中的图像?

    是否可以更改嵌入内的图像 我正在尝试重新创建一个我在 Reddit 上看到的 蚀刻草图 机器人 并且想知道它是如何完成的 到目前为止 这是我尝试过的 这是在制作图像的函数内部 code that draws the etch a sketc
  • Next.js 路由器对某些页面上的浏览器后退按钮没有反应

    当浏览器的后退按钮打开时 我遇到了难以调查的错误https gart gallery 如果你去https gart gallery artworks 然后是任何艺术品 例如https gart gallery artworks my pla
  • 使用 jquery 创建会话?

    是否可以使用 jquery 或 javascript 创建会话变量 或者我是否必须使用 ajax 来调用执行此操作的 php 您需要使用服务器请求 Javascript仅在客户端运行 会话数据存储在服务器上 example of passi
  • 将 CSV 文件转换为 TF 记录

    我已经运行我的脚本超过 5 个小时了 我有 258 个 CSV 文件想要转换为 TF Records 我编写了以下脚本 正如我所说 我已经运行它超过 5 个小时了 import argparse import os import sys i
  • JAX-WS 返回复杂对象?

    我对 Java Web 服务还很陌生 但我在任何地方都找不到很好的解释 我在 NetBeans 中有 2 个 Java Web 项目 一种作为 Web 服务 另一种作为该 Web 服务的客户端 我还创建了自己的类 名为 Person 其中包
  • 如何将 updateview 与foreignkey/onetoonefield一起使用

    class ModTool models Model issue models OneToOneField Issue priority models CharField max length 1 choices PRIORITY blan
  • 使用 formControlName 作为反应式形式的自定义输入组件

    有一个自定义输入组件 它以带有验证的反应形式使用 Component moduleId module id toString selector custom select templateUrl custom select componen
  • 根据另一个文件中的顺序对一个文件中的行进行排序

    给定一个文件1 13 a b c d 5 f a c d 7 d c g a 14 a v s d 和一个文件2 7 x 5 c 14 a 13 i 我想考虑 file2 中第一列的相同顺序对 file1 进行排序 以便输出应为 7 d c
  • Matlab mex“缺少依赖共享库”

    我在 Matlab 2017a 中创建了几个 mex 文件 当我使用 Visual Studio C 2017 编译它们时 它们在我的计算机上运行良好 但是 当我尝试在另一台计算机上使用它们时 我收到一条错误消息 Error using m
  • 如何在 SQLAlchemy 中查询关联表?

    我正在尝试将 SQL 查询转换为 SQLAlchemy 查询 以供用户在 get API 内使用 问题是我无法从关联表中查询任何内容 我确信我不知道该方法 ORM roles users db Table roles users db Co
  • 如何从XLS(Excel)文件读取数据[Java,Android]

    我搜索过 stackoverflow 但没有找到明确的答案 如何将 XLS 文件的特定行和列的数据读取到我的 Android 应用程序 如何读取 XLS 文件 我不想将其转换为 CSV 因为当我尝试转换它们时出现错误 也许我可以用这个htt
  • iOS 长宽比限制在 iOS 7 上被打破,在 iOS 8 上工作

    在我的应用程序中 我设置了 UIView 的约束 其高度是使用宽高比约束从其宽度计算出来的 它在各种设备屏幕尺寸 3 5 4 4 7 5 5 的 iOS8 上完美运行 但它会导致应用程序在每个 iOS7 设备上崩溃 我认为这是纵横比约束 因
  • 使用两个不同类型的 Guava ListenableFutures 的结果

    我有两个 ListenableFutures 它们在其他线程上完成 每个未来都有不同的类型 我希望在它们都完成时使用它们的结果 有没有一种优雅的方法来使用番石榴来处理这个问题 如果您想要某种类型安全 您可以执行以下操作 class Comp
  • 如何管理气流 dag 之间的 python 包?

    如果我有多个气流 dags 以及一些重叠的 python 包依赖项 我如何保留每个项目 deps 脱钩 例如 如果我在同一台服务器上有项目 A 和 B 我会用类似的东西运行它们 source path to virtualenv a act