ECS Airflow 1.10.2 性能问题。操作员和任务花费的时间延长 10 倍

2023-12-28

我们迁移到 puckel/Airflow-1.10.2,以尝试解决我们在多种环境中遇到的性能不佳的问题。我们在 AWS ECS 上的 ECS Airflow 1.10.2 上运行。有趣的是,CPU/mem 永远不会跳到 80% 以上。 Airflow metadb 也仍未得到充分利用。

下面我列出了我们正在使用的配置、DagBag 解析时间以及来自cProfile刚刚运行的输出DagBag()在纯Python中。

我们的一些 DAG 导入了一个函数create_subdag_functions.py返回我们在 12 个 DAG 中使用的 DAG。大多数 DAG 及其相应的子DAG 仅每小时运行一次,但 1 个 DAG / 3 个子DAG 每 10 分钟运行一次。

max_threads = 2
dag_dir_list_interval = 300 
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
executor = CeleryExecutor

一些观察结果:

  • airflow list_dags -r即使示例 DAG 被禁用,也会花费很长时间并耗尽它们。解析每个 DAG 的时间会跳跃。
  • 每个 DAG 的持续时间不一致(但这仅适用于我们的 DAG,不适用于示例)
  • 解析时间通常会有很大的跳跃。例如5 个 dags 的持续时间
  • 当我们介绍DagBag()与 cProfile 函数一起我们发现 DagBag() 大部分时间都花在airflow.utils.dag_processing.list_py_paths功能可能是由于我们的 /usr/local/airflow/dags 文件夹中有 50 多个 sql 文件
  • 纵观着陆时间,任务时间在两次特定运行之间跳跃了一个数量级。我尝试查看日志等,两次运行之间没有什么值得注意的地方。我已将图像附加在底部。此性能损失出现在 Airflow 1.10.0 中

我尝试过的解决方案:

  • 增加/减少max_threads
  • 增加/消除min_file_process_interval
  • 清除所有 DAG 的气流数据库并重新加载
  • 关闭并重新部署环境
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 189.77048399999995
--------------------------------------------+--------------------+---------+----------+------------------------------------------------------------------------------------------------------------
--------------------------------------------+--------------------+---------+----------+------------------------------------------------------------------------------------------------------------
/dag1.py                                    | 60.576728          |       1 |       21 | ['dag1']
/dag2.py                                    | 55.092603999999994 |       1 |       28 | ['dag2']
/dag3.py                                    | 47.997972000000004 |       1 |       17 | ['dag3']
/dag4.py                                    | 22.99313           |       3 |       16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/dag5.py                                    | 0.67               |       1 |       21 | ['dag5']
/dag6.py                                    | 0.652114           |       1 |        9 | ['dag6']
/dag7.py                                    | 0.45368            |       1 |       26 | ['dag7']
/dag8.py                                    | 0.396908           |       5 |       40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag9.py                                    | 0.242012           |       6 |       38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag10.py                                   | 0.134342           |       1 |        1 | ['dag10']
/dag11.py                                   | 0.13325            |       2 |        8 | ['dag11', 'dag12.subdag1']
/dag12.py                                   | 0.10562            |       1 |        6 | ['dag12']
/create_subdag_functions.py                 | 0.105292           |       0 |        0 | []
 example_http_operator.py                   | 0.040636           |       1 |        6 | ['example_http_operator']
 example_subdag_operator.py                 | 0.005328           |       3 |       15 | ['example_subdag_operator', 'example_subdag_operator.section-1', 'example_subdag_operator.section-2']
 example_bash_operator.py                   | 0.004052           |       1 |        6 | ['example_bash_operator']
 example_branch_operator.py                 | 0.003444           |       1 |       11 | ['example_branch_operator']
 example_branch_python_dop_operator_3.py    | 0.003418           |       1 |        3 | ['example_branch_dop_operator_v3']
 example_passing_params_via_test_command.py | 0.003222           |       1 |        2 | ['example_passing_params_via_test_command']
 example_skip_dag.py                        | 0.002386           |       1 |        8 | ['example_skip_dag']
 example_trigger_controller_dag.py          | 0.002386           |       1 |        1 | ['example_trigger_controller_dag']
 example_short_circuit_operator.py          | 0.002344           |       1 |        6 | ['example_short_circuit_operator']
 example_python_operator.py                 | 0.002218           |       1 |        6 | ['example_python_operator']
 example_latest_only.py                     | 0.002196           |       1 |        2 | ['latest_only']
 example_latest_only_with_trigger.py        | 0.001848           |       1 |        5 | ['latest_only_with_trigger']
 example_xcom.py                            | 0.001722           |       1 |        3 | ['example_xcom']
 docker_copy_data.py                        | 0.001718           |       0 |        0 | []
 example_trigger_target_dag.py              | 0.001704           |       1 |        2 | ['example_trigger_target_dag']
 tutorial.py                                | 0.00165            |       1 |        3 | ['tutorial']
 test_utils.py                              | 0.001376           |       1 |        1 | ['test_utils']
 example_docker_operator.py                 | 0.00103            |       0 |        0 | []
 subdags/subdag.py                          | 0.001016           |       0 |        0 | []
-------------------------------------------------------------------------------------------------------+--------------------+---------+----------+--------------------------------------------------
-------------------------------------------------------------------
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 296.5826819999999
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
file                          | duration           | dag_num | task_num | dags
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/dag1.py                      | 74.819988          |       1 |       21 | ['dag1']
/dag3.py                      | 53.193430000000006 |       1 |       17 | ['dag3']
/dag8.py                      | 34.535742          |       5 |       40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag4.py                      | 21.543944000000003 |       6 |       38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag5.py                      | 18.458316000000003 |       3 |       16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/create_subdag_functions.py   | 14.652806000000002 |       0 |        0 | []
/dag7.py                      | 13.051984000000001 |       2 |        8 | ['dag11', 'dag11.subdag1']
/dag8.py                      | 10.02703           |       1 |       21 | ['dag5']
/dag9.py                      | 9.834226000000001  |       1 |        1 | ['dag10']
/dag10.py                     | 9.575258000000002  |       1 |       28 | ['dag2']
/dag11.py                     | 9.418897999999999  |       1 |        9 | ['dag6']
/dag12.py                     | 9.319210000000002  |       1 |        6 | ['dag12']
/dag13.py                     | 8.686964           |       1 |       26 | ['dag7']

注意:为简洁起见,从第二个输出中删除了示例 DAG

cProfile 输出from airflow.models import DagBag; DagBag():

{{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=6740
{{__init__.py:51}} INFO - Using executor SequentialExecutor
{{models.py:273}} INFO - Filling up the DagBag from 

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      997  443.441    0.445  443.441    0.445 {built-in method io.open}
      198  186.978    0.944  483.629    2.443 zipfile.py:198(is_zipfile)
      642   65.069    0.101   65.069    0.101 {method 'close' of '_io.BufferedReader' objects}
     1351   45.924    0.034   45.946    0.034 <frozen importlib._bootstrap_external>:830(get_data)
     7916   39.403    0.005   39.403    0.005 {built-in method posix.stat}
      2/1   22.927   11.464  544.419  544.419 dag_processing.py:220(list_py_file_paths)
       33   18.992    0.576  289.797    8.782 models.py:321(process_file)
       22    8.723    0.397    8.723    0.397 {built-in method posix.scandir}
      412    2.379    0.006    2.379    0.006 {built-in method posix.listdir}
        9    1.301    0.145    3.058    0.340 linecache.py:82(updatecache)
 1682/355    0.186    0.000    0.731    0.002 sre_parse.py:470(_parse)
     1255    0.183    0.000    0.183    0.000 {built-in method marshal.loads}
 3092/325    0.143    0.000    0.647    0.002 sre_compile.py:64(_compile)
       59    0.139    0.002    0.139    0.002 {built-in method builtins.compile}
    25270    0.134    0.000    0.210    0.000 sre_parse.py:253(get)
    52266    0.132    0.000    0.132    0.000 {method 'append' of 'list' objects}
4210/4145    0.131    0.000    1.760    0.000 {built-in method builtins.__build_class__}

气流性能下降:


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ECS Airflow 1.10.2 性能问题。操作员和任务花费的时间延长 10 倍 的相关文章

  • InterfaceError:连接已关闭(使用 django + celery + Scrapy)

    当我在 Celery 任务中使用 Scrapy 解析函数 有时可能需要 10 分钟 时 我得到了这个信息 我用 姜戈 1 6 5 django celery 3 1 16 芹菜 3 1 16 psycopg2 2 5 5 我也使用了psyc
  • 将字符串转换为带有毫秒和时区的日期时间 - Python

    我有以下 python 片段 from datetime import datetime timestamp 05 Jan 2015 17 47 59 000 0800 datetime object datetime strptime t
  • Pycharm Python 控制台不打印输出

    我有一个从 Pycharm python 控制台调用的函数 但没有显示输出 In 2 def problem1 6 for i in range 1 101 2 print i end In 3 problem1 6 In 4 另一方面 像
  • 如何在android上的python kivy中关闭应用程序后使服务继续工作

    我希望我的服务在关闭应用程序后继续工作 但我做不到 我听说我应该使用startForeground 但如何在Python中做到这一点呢 应用程序代码 from kivy app import App from kivy uix floatl
  • 导入错误:没有名为 _ssl 的模块

    带 Python 2 7 的 Ubuntu Maverick 我不知道如何解决以下导入错误 gt gt gt import ssl Traceback most recent call last File
  • 如何使用 Scrapy 从网站获取所有纯文本?

    我希望在 HTML 呈现后 可以从网站上看到所有文本 我正在使用 Scrapy 框架使用 Python 工作 和xpath body text 我能够获取它 但是带有 HTML 标签 而且我只想要文本 有什么解决办法吗 最简单的选择是ext
  • 从 scikit-learn 导入 make_blobs [重复]

    这个问题在这里已经有答案了 我收到下一个警告 D Programming Python ML venv lib site packages sklearn utils deprecation py 77 DeprecationWarning
  • SQLAlchemy 与 celery 的会话问题

    我已经为我们的网络应用程序安排了一些使用 celerybeat 重复执行的任务 该应用程序本身是使用金字塔网络框架构建的 使用 zopetransaction 扩展来管理会话 在 celery 中 我将该应用程序用作库 我正在使用函数重新定
  • Python 中的二进制缓冲区

    在Python中你可以使用StringIO https docs python org library struct html用于字符数据的类似文件的缓冲区 内存映射文件 https docs python org library mmap
  • python pandas 中的双端队列

    我正在使用Python的deque 实现一个简单的循环缓冲区 from collections import deque import numpy as np test sequence np array range 100 2 resha
  • Python - 按月对日期进行分组

    这是一个简单的问题 起初我认为很简单而忽略了它 一个小时过去了 我不太确定 所以 我有一个Python列表datetime对象 我想用图表来表示它们 x 值是年份和月份 y 值是此列表中本月发生的日期对象的数量 也许一个例子可以更好地证明这
  • 如何改变Python中特定打印字母的颜色?

    我正在尝试做一个简短的测验 并且想将错误答案显示为红色 欢迎来到我的测验 您想开始吗 是的 祝你好运 法国的首都是哪里 法国 随机答案不正确的答案 我正在尝试将其显示为红色 我的代码是 print Welcome to my Quiz be
  • Nuitka 未使用 nuitka --recurse-all hello.py [错误] 编译 exe

    我正在尝试通过 nuitka 创建一个简单的 exe 这样我就可以在我的笔记本电脑上运行它 而无需安装 Python 我在 Windows 10 上并使用 Anaconda Python 3 我输入 nuitka recurse all h
  • 如何将 PIL 图像转换为 NumPy 数组?

    如何转换 PILImage来回转换为 NumPy 数组 这样我就可以比 PIL 进行更快的像素级转换PixelAccess允许 我可以通过以下方式将其转换为 NumPy 数组 pic Image open foo jpg pix numpy
  • 在Python中重置生成器对象

    我有一个由多个yield 返回的生成器对象 准备调用该生成器是相当耗时的操作 这就是为什么我想多次重复使用生成器 y FunctionWithYield for x in y print x here must be something t
  • 设置 torch.gather(...) 调用的结果

    我有一个形状为 n x m 的 2D pytorch 张量 我想使用索引列表来索引第二个维度 可以使用 torch gather 完成 然后然后还设置新值到索引的结果 Example data torch tensor 0 1 2 3 4
  • 如何从没有结尾的管道中读取 python 中的 stdin

    当管道来自 打开 时 不知道正确的名称 我无法从 python 中的标准输入或管道读取数据 文件 我有作为例子管道测试 py import sys import time k 0 try for line in sys stdin k k
  • 在 Pandas DataFrame Python 中添加新列[重复]

    这个问题在这里已经有答案了 例如 我在 Pandas 中有数据框 Col1 Col2 A 1 B 2 C 3 现在 如果我想再添加一个名为 Col3 的列 并且该值基于 Col2 式中 如果Col2 gt 1 则Col3为0 否则为1 所以
  • Python 分析:“‘select.poll’对象的‘poll’方法”是什么?

    我已经使用 python 分析了我的 python 代码cProfile模块并得到以下结果 ncalls tottime percall cumtime percall filename lineno function 13937860 9
  • PyAudio ErrNo 输入溢出 -9981

    我遇到了与用户相同的错误 Python 使用 Pyaudio 以 16000Hz 录制音频时出错 https stackoverflow com questions 12994981 python error audio recording

随机推荐

  • 以编程方式在给定时间暂停/停止 Android MediaPlayer

    我研究了一下 但找不到解决这个问题的任何方法 我想玩一个MediaPlayer并在给定时间暂停 停止 即 从第 6 秒播放到第 17 秒 我知道我可以设置它的起点seekTo 方法 但我可以通过设置终点来暂停 停止播放 当然 在达到文件结束
  • 读取文件列表,应用函数并用相同名称重写

    我有一组包含重复条目的 csv 文件 我需要删除并重写具有相同名称和格式的文件 这是我到目前为止所做的 filenames lt list files pattern csv datalist lt lapply filenames fun
  • 如何在 IIS 中监视 .NET MySQL 数据连接器的连接池

    我已经在谷歌上搜索了相当多的信息 但无法找到确切的答案 我们在日志中看到以下错误 超时已过 获取之前已过了超时时间 来自池的连接 发生这种情况的原因可能是所有 连接正在使用中并且已达到最大池大小 堆栈跟踪 位于 MySql Data MyS
  • MXE - 使用 cmake 和 mingw 交叉编译时对 Qt 的未定义引用

    我正在尝试编译电子通桌面 https github com electronpass electronpass desktop 对于 Windows 使用MXE http mxe cc在Linux上 我已经成功编译了它的所有依赖项 包括li
  • 使用表的字段值重命名 SQL 表的列

    我正在尝试执行一个 SQL 查询 该查询将使用表中第一个记录集中的文本重命名表的列 我的桌子看起来像这样 COL1 COL2 COL3 COL4 COL5 COL6 REASON ITEMDATE ITEMTIME SITENAME EVE
  • 将数据上传到共享内存中用于卷积核

    我在理解评论中提到的批量加载时遇到一些困难 为了计算像素中的卷积 大小为 5 的掩模必须以该特定像素为中心 图像被分成图块 应用卷积掩模后的这些图块是最终输出图块 其大小为TILE WIDTH TILE WIDTH 对于属于输出图块边界的像
  • Powershell HomeDirectory 未在文件服务器文件系统上创建

    我在使用 Powershell 并通过 cmdlet 设置 HomeDirectory 时遇到了一个奇怪的问题 Set ADUser Identity user HomeDirectory fileserver home user 即使 c
  • Grails Spring 安全登录问题:/auth?login_error=1

    我成功安装了 SpringSecurity 在用户注册并使用 Spring Security UI 的 RegisterController 闭包验证用户后 我可以看到用户已使用 springSecurityService reauthen
  • WordPress 远程导致“此网页有重定向循环”错误

    我有一个 WordPress 网站 在开发过程中运行得非常好 位于mysite dev 但是当我将其部署到远程服务器时 mysite com 它抛出 此网页有重定向循环 error 我可以在加载栏中看到浏览器一次又一次地尝试 www mys
  • 为菜鸟提供使用仪器泄漏的建议

    你好 我对 iPhone 开发还很陌生 我第一次使用 Instruments 中的 Leaks 运行我的应用程序 它向我展示了大约 20 个泄漏 最小的是 32 字节 还有一个 1KB 我遵循了内存管理指南 我 认为我 了解如何以及何时使用
  • ASP.NET 部分页面上传,无需 Updatepanel /使用 jQuery

    我有一个 ASPX 页面 在顶部我显示 5 个类别 例如 笔 书 鞋 手机 镜子 当我单击任何类别时 我想在标题下方显示该类别下的产品 我不想为此重新加载整个页面 当单击发生时 除了图像的中心位置 可能是要显示的 DIV 或表格 我想保持页
  • 使用 mod_rewrite 进行 Apache ssl 重定向

    我想这样做 如果他们这样做https example com我想将他们重定向到https www example com 添加www 我尝试了很多事情都无济于事 Redirect https example com
  • React isValidElement 结果为 false

    这是一个简单的例子 const Foo gt return div foo div class Bar extends React Component render return div bar div console log React
  • Selenium如何访问同一css类的两个控件

    我正在使用 selenium ide 进行测试 我的目标是验证以下内容 1 文本框的最大和最小长度属性 2 验证标签文字 我的html代码如下 div class control group div
  • 如何在网络视图中启用受保护的内容?

    在chrome浏览器中 有一个选项播放受保护的内容 我如何启用相同的功能webview in android 我尝试过一种称为allowContentAccess 但这不起作用 请帮忙 要允许 Web 视图播放 DRM 内容 您必须向 We
  • 如何将货币字符串格式化为浮点值?

    我得到的字符串是这样的 2 000 00 现在我想将此字符串更改为浮点值 如何将此字符串转换为浮点型 实际上 此字符串 2 000 00 来自使用货币格式化程序格式化数字 但现在我想将此字符串更改为浮点值 请为此指导任何人 提前致谢 考虑字
  • Rails:Psych,从 0.1.4 更新 libyaml

    我需要安装 capybara webkit 它需要 qt 库 所以我使用 homebrew 并使用以下命令安装了它们 brew update brew install qt brew linkapps 然后我捆绑了 capybara web
  • 我可以从表示层访问存储库吗?

    我从 DDD 开始 我对 DDD 应用程序中涉及的几个层之间的交互有点困惑 我可以从表示层调用我的存储库吗 如果不是 我是否必须在我的服务层中复制存储库提供的 CRUD 功能 当然 服务层将反过来使用这些功能的存储库 最好的方法是什么 表示
  • Android 翻译动画 - 使用 AnimationListener 将 View 永久移动到新位置

    I have android translate Animation I have an ImageView with random generated position next1 next2 I am calling void ever
  • ECS Airflow 1.10.2 性能问题。操作员和任务花费的时间延长 10 倍

    我们迁移到 puckel Airflow 1 10 2 以尝试解决我们在多种环境中遇到的性能不佳的问题 我们在 AWS ECS 上的 ECS Airflow 1 10 2 上运行 有趣的是 CPU mem 永远不会跳到 80 以上 Airf