ECS Airflow 1.10.2 性能问题。操作员和任务花费的时间延长 10 倍


我们迁移到 puckel/Airflow-1.10.2,以尝试解决我们在多种环境中遇到的性能不佳的问题。我们在 AWS ECS 上的 ECS Airflow 1.10.2 上运行。有趣的是,CPU/mem 永远不会跳到 80% 以上。 Airflow metadb 也仍未得到充分利用。

下面我列出了我们正在使用的配置、DagBag 解析时间以及来自cProfile刚刚运行的输出DagBag()在纯Python中。

我们的一些 DAG 导入了一个函数create_subdag_functions.py返回我们在 12 个 DAG 中使用的 DAG。大多数 DAG 及其相应的子DAG 仅每小时运行一次,但 1 个 DAG / 3 个子DAG 每 10 分钟运行一次。

max_threads = 2
dag_dir_list_interval = 300 
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
executor = CeleryExecutor


  • airflow list_dags -r即使示例 DAG 被禁用,也会花费很长时间并耗尽它们。解析每个 DAG 的时间会跳跃。
  • 每个 DAG 的持续时间不一致(但这仅适用于我们的 DAG,不适用于示例)
  • 解析时间通常会有很大的跳跃。例如5 个 dags 的持续时间
  • 当我们介绍DagBag()与 cProfile 函数一起我们发现 DagBag() 大部分时间都花在airflow.utils.dag_processing.list_py_paths功能可能是由于我们的 /usr/local/airflow/dags 文件夹中有 50 多个 sql 文件
  • 纵观着陆时间,任务时间在两次特定运行之间跳跃了一个数量级。我尝试查看日志等,两次运行之间没有什么值得注意的地方。我已将图像附加在底部。此性能损失出现在 Airflow 1.10.0 中


  • 增加/减少max_threads
  • 增加/消除min_file_process_interval
  • 清除所有 DAG 的气流数据库并重新加载
  • 关闭并重新部署环境
DagBag loading stats for /usr/local/airflow/dags
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 189.77048399999995
/                                    | 60.576728          |       1 |       21 | ['dag1']
/                                    | 55.092603999999994 |       1 |       28 | ['dag2']
/                                    | 47.997972000000004 |       1 |       17 | ['dag3']
/                                    | 22.99313           |       3 |       16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/                                    | 0.67               |       1 |       21 | ['dag5']
/                                    | 0.652114           |       1 |        9 | ['dag6']
/                                    | 0.45368            |       1 |       26 | ['dag7']
/                                    | 0.396908           |       5 |       40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/                                    | 0.242012           |       6 |       38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/                                   | 0.134342           |       1 |        1 | ['dag10']
/                                   | 0.13325            |       2 |        8 | ['dag11', 'dag12.subdag1']
/                                   | 0.10562            |       1 |        6 | ['dag12']
/                 | 0.105292           |       0 |        0 | []                   | 0.040636           |       1 |        6 | ['example_http_operator']                 | 0.005328           |       3 |       15 | ['example_subdag_operator', 'example_subdag_operator.section-1', 'example_subdag_operator.section-2']                   | 0.004052           |       1 |        6 | ['example_bash_operator']                 | 0.003444           |       1 |       11 | ['example_branch_operator']    | 0.003418           |       1 |        3 | ['example_branch_dop_operator_v3'] | 0.003222           |       1 |        2 | ['example_passing_params_via_test_command']                        | 0.002386           |       1 |        8 | ['example_skip_dag']          | 0.002386           |       1 |        1 | ['example_trigger_controller_dag']          | 0.002344           |       1 |        6 | ['example_short_circuit_operator']                 | 0.002218           |       1 |        6 | ['example_python_operator']                     | 0.002196           |       1 |        2 | ['latest_only']        | 0.001848           |       1 |        5 | ['latest_only_with_trigger']                            | 0.001722           |       1 |        3 | ['example_xcom']                        | 0.001718           |       0 |        0 | []              | 0.001704           |       1 |        2 | ['example_trigger_target_dag']                                | 0.00165            |       1 |        3 | ['tutorial']                              | 0.001376           |       1 |        1 | ['test_utils']                 | 0.00103            |       0 |        0 | []
 subdags/                          | 0.001016           |       0 |        0 | []
DagBag loading stats for /usr/local/airflow/dags
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 296.5826819999999
file                          | duration           | dag_num | task_num | dags
/                      | 74.819988          |       1 |       21 | ['dag1']
/                      | 53.193430000000006 |       1 |       17 | ['dag3']
/                      | 34.535742          |       5 |       40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/                      | 21.543944000000003 |       6 |       38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/                      | 18.458316000000003 |       3 |       16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/   | 14.652806000000002 |       0 |        0 | []
/                      | 13.051984000000001 |       2 |        8 | ['dag11', 'dag11.subdag1']
/                      | 10.02703           |       1 |       21 | ['dag5']
/                      | 9.834226000000001  |       1 |        1 | ['dag10']
/                     | 9.575258000000002  |       1 |       28 | ['dag2']
/                     | 9.418897999999999  |       1 |        9 | ['dag6']
/                     | 9.319210000000002  |       1 |        6 | ['dag12']
/                     | 8.686964           |       1 |       26 | ['dag7']

注意:为简洁起见,从第二个输出中删除了示例 DAG

cProfile 输出from airflow.models import DagBag; DagBag():

{{}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=6740
{{}} INFO - Using executor SequentialExecutor
{{}} INFO - Filling up the DagBag from 

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      997  443.441    0.445  443.441    0.445 {built-in method}
      198  186.978    0.944  483.629    2.443
      642   65.069    0.101   65.069    0.101 {method 'close' of '_io.BufferedReader' objects}
     1351   45.924    0.034   45.946    0.034 <frozen importlib._bootstrap_external>:830(get_data)
     7916   39.403    0.005   39.403    0.005 {built-in method posix.stat}
      2/1   22.927   11.464  544.419  544.419
       33   18.992    0.576  289.797    8.782
       22    8.723    0.397    8.723    0.397 {built-in method posix.scandir}
      412    2.379    0.006    2.379    0.006 {built-in method posix.listdir}
        9    1.301    0.145    3.058    0.340
 1682/355    0.186    0.000    0.731    0.002
     1255    0.183    0.000    0.183    0.000 {built-in method marshal.loads}
 3092/325    0.143    0.000    0.647    0.002
       59    0.139    0.002    0.139    0.002 {built-in method builtins.compile}
    25270    0.134    0.000    0.210    0.000
    52266    0.132    0.000    0.132    0.000 {method 'append' of 'list' objects}
4210/4145    0.131    0.000    1.760    0.000 {built-in method builtins.__build_class__}




