在 Airflow 中编写和导入自定义插件

2024-04-12

这实际上是两个问题合二为一。

My AIRFLOW_HOME结构如下

airflow
+-- dags
+-- plugins
    +-- __init__.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py
        +-- another_hook.py
    +-- operators
        +-- __init__.py
        +-- my_operator.py
        +-- another_operator.py
    +-- sensors
    +-- utils

我一直在这里关注 astronomer.io 的示例https://github.com/airflow-plugins https://github.com/airflow-plugins。我的定制operators使用我的习惯hooks,并且所有导入都是相对于顶级文件夹的plugins.

# my_operator.py
from plugins.hooks.my_hook import MyHook

但是,当我尝试将整个存储库移动到插件文件夹中时,运行后出现导入错误airflow list_dags这么说plugins找不到。

我读了一些相关内容,显然 Airflow 将插件加载到其核心模块中,以便可以像这样导入它们

# my_operator.py
from airflow.hooks.my_hook import MyHook

所以我将所有导入更改为直接读取airflow.plugin_type反而。不过,我收到另一个导入错误,这次说my_hook找不到。我每次都会重新启动我的工作人员、调度程序和网络服务器,但这似乎不是问题。我查看了类似问题中提出的解决方案,但它们也不起作用。

官方文档也是这样说明的https://airflow.apache.org/plugins.html https://airflow.apache.org/plugins.html的延长AirflowPlugin类,但我不确定这个“接口”应该驻留在哪里。我还更喜欢拖放选项。

最后,我的代码仓库显然没有意义plugins文件夹本身,但如果我将它们分开,测试就会变得不方便。每次在钩子/操作上运行单元测试时,是否都必须修改 Airflow 配置以指向我的存储库?测试自定义插件的最佳实践是什么?


我通过一些试验和错误发现了这一点。这是我的最终结构AIRFLOW_HOME folder

airflow 
+-- dags 
+-- plugins
    +-- __init__.py
    +-- plugin_name.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py 
        +-- another_hook.py 
    +-- operators
        +-- __init__.py
        +-- my_operator.py 
        +-- another_operator.py 
    +-- sensors 
    +-- utils

In plugin_name.py,我扩展AirflowPlugin class

# plugin_name.py

from airflow.plugins_manager import AirflowPlugin
from hooks.my_hook import *
from operators.my_operator import *
from utils.my_utils import *
# etc

class PluginName(AirflowPlugin):

    name = 'plugin_name'

    hooks = [MyHook]
    operators = [MyOperator]
    macros = [my_util_func]

在使用自定义挂钩的自定义运算符中,我将它们导入为

# my_operator.py

from hooks.my_hook import MyHook

然后在我的 DAG 文件中,我可以这样做

# sample_dag.py

from airflow.operators.plugin_name import MyOperator

需要重新启动网络服务器和调度程序。我花了一段时间才弄清楚。

这也有利于测试,因为自定义类中的导入是相对于文件夹中的子模块的plugins。我想知道我是否可以省略__init__.py里面的文件plugins,但由于一切正常,我没有尝试这样做。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Airflow 中编写和导入自定义插件 的相关文章

  • Airflow Worker 没有监听默认的 RabbitMQ 队列

    我已经使用rabbitmq代理配置了Airflow 服务 airflow worker airflow scheduler airflow webserver 正在运行 没有任何错误 调度程序正在推动任务执行default兔子MQ队列 即使
  • Airflow DAG动态结构

    我正在寻找一个可以决定 dag 结构的解决方案当 dag 被触发时因为我不确定我必须运行的操作员数量 请参阅下面我计划创建的执行顺序 Task B 1 Task C 1 Task B 2 Task C 2 Task A Task B 3 g
  • 处理 Airflow DAG 随着时间的变化(DAG 版本控制)

    我们有相对复杂的动态 DAG 作为 ETL 的一部分 DAG 包含数百个转换 它是基于一组 yaml 文件以编程方式创建的 它随着时间的推移而发生变化 添加新任务 任务执行的查询发生变化 甚至任务之间的关系也发生变化 我知道每次以这种方式更
  • 如何在 Apache Airflow 中正确处理夏令时?

    在气流中 一切都应该是 UTC 不受 DST 影响 但是 我们的工作流程可以根据受 DST 影响的时区交付内容 一个示例场景 我们安排了一项作业 开始日期为东部时间上午 8 00 计划间隔为 24 小时 每天东部时间上午 8 点 调度程序会
  • 如何在 Airflow 中使用 HashiCorp Vault?

    我开始使用 Apache Airflow 我想知道如何有效地使其使用存储在 Vault 中的秘密和密码 不幸的是 搜索不会返回超出范围的有意义的答案Airflow 中尚未实现的钩子 https issues apache org jira
  • 在 MWAA 中设置 PYTHONPATH

    我正在尝试在 MWAA 上的 dag 内使用本地模块 文件夹结构如下 init py dags init py my dag init py dag py utils init py file py secrets py date py 我
  • Airflow - 处理 DAG 回调的正确方法

    我有一个DAG然后每当它成功或失败时 我希望它触发一个发布到 Slack 的方法 My DAG args就像下面这样 default args on failure callback slack slack message sad mess
  • 添加到本地主机数据库的气流连接(在 docker 上运行的 postgres)

    我有一个本地运行的 dockerized postgres 我可以通过 pgAdmin4 和 via 连接到它psql 使用相同的连接详细信息 我在 UI 上设置了气流连接 但是 当尝试加载使用该连接的 DAG 时 它会抛出错误 损坏的 D
  • 如何检查何时为特定 dag 安排了下一次 Airflow DAG 运行?

    我已设置气流并运行一些 DAG 安排每天一次 0 0 我想检查下次计划运行特定 dag 的时间 但我看不到可以在管理员中执行此操作的位置 如果你愿意 你可以使用Airflow s CLI 有next execution option htt
  • 如何在 Google Composer 上重新启动气流服务器?

    当我需要在本地重新启动网络服务器时 我会这样做 ps ef grep airflow awk print 2 xargs kill 9 airflow webserver p 8080 D 我如何在 Google Composer 上执行此
  • 使用 Airflow BigqueryOperator 向 BigQuery 表添加标签

    我必须向 bigquery 表添加标签 我知道可以通过 BigQuery UI 来完成此操作 但如何通过气流运算符来完成此操作 Use case 用于计费和搜索目的 由于多个团队在同一项目和数据集下工作 我们需要将各个团队创建的所有表组合在
  • 为什么我的 Airflow 任务被“外部设置为失败”?

    我使用的是 Airflow 2 0 0 我的任务在运行几秒钟或几分钟后偶尔会被 外部 终止 任务通常会成功运行 都是通过以下方式启动的手动任务 airflow tasks test 以及计划的 DAG 运行 所以我相信这与我的 DAG 代码
  • Amazon MWAA Airflow - 任务容器在没有日志的情况下关闭/停止/终止

    我们使用 Amazon MWAA Airflow 很少有任务标记为 FAILED 但根本没有日志 就好像容器在我们没有注意到的情况下被关闭了一样 我找到了这个链接 https cloud google com composer docs h
  • Airflow 默认连接数过多

    我打开气流并检查连接 发现其后面运行的连接太多 关于如何杀死那些我不使用的任何想法 或者我很想知道运行它的最小 conn id 建筑学 LocalExecutor 与其他经纪人不同 Postgres 作为元数据库 但它列出了 17 个连接
  • Airflow 1.10.3 - 空白“最近任务”和“DAG 运行”

    我在 Ubuntu 18 10 上安装了 Airflow 1 10 3 并且能够添加 DAG 并运行它们 但 Web UI 中的 最近任务 和 DAG 运行 为空 我所看到的只是一个黑色虚线圆圈 它不断加载 但什么也没有实现 我最近将 Ai
  • 带子任务的 Airflow 并行任务

    我需要在 Apache Airflow 上运行以下图表 但我遇到了并行步骤的问题 因为它们有多个子步骤 gt task 1a gt tast 1b gt task 4a gt tast 4b Start gt task 2a gt tast
  • 将所有气流连接导出到新环境

    我正在尝试将所有现有的气流连接迁移到新的气流 我正在查看 cli 选项airflow connections help 它提供了列出的选项 但没有提供从 json 格式导出 导入的选项 有没有办法通过 cli airflow ui 跨多个气
  • Airflow Python 单元测试?

    我想为我们的 DAG 添加一些单元测试 但找不到任何单元测试 有 DAG 单元测试框架吗 有一个端到端的测试框架存在 但我猜它已经死了 https issues apache org jira browse AIRFLOW 79 https
  • 如何在Airflow中的PythonOperator的python_callable中提供异步函数?

    我有正在执行的任务 但这些任务通常是异步的 我正在尝试使用 Airflow 运行管道 但它给了我错误 类型错误 无法腌制协程对象 由于这些函数是异步的 我想将它们包装在 asyncio run 中 但仍然不起作用 class Top asy
  • Helm Chart 发布官方气流

    我想知道如何使用官方气流 helm 图表编写 helm 发布 yaml 文件并覆盖 value yaml 文件 我正在尝试使用此配置文件在 kubernetes 集群上部署带有 Flux 的气流 我试过 apiVersion helm fl

随机推荐