如何控制 Airflow 安装的并行性或并发性?

2023-12-24

在我的一些 Apache Airflow 安装中,即使调度程序未完全加载,计划运行的 DAG 或任务也不会运行。如何增加可以同时运行的 DAG 或任务的数量?

同样,如果我的安装处于高负载状态,并且我想限制 Airflow 工作人员拉取排队任务的速度(例如减少资源消耗),我可以调整什么来减少平均负载?


以下是自 Airflow v1.10.2 起可用的配置选项的扩展列表。有些可以在每个 DAG 或每个操作员的基础上进行设置,但如果未指定,也可能会回退到设置范围内的默认值。


可以指定的选项基于每个 DAG:

  • concurrency:允许在设置的 DAG 的所有活动运行中同时运行的任务实例数。默认为core.dag_concurrency如果没有设置
  • max_active_runs:此 DAG 的最大活动运行数。一旦达到此限制,调度程序将不会创建新的活动 DAG 运行。默认为core.max_active_runs_per_dag如果没有设置

例子:

# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)

# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)

可以指定的选项以每个运营商为基础:

  • pool:执行任务的池。Pools https://airflow.apache.org/concepts.html#pools可用于限制并行度只有一个子集任务数
  • max_active_tis_per_dag:控制每个任务跨 dag_runs 的并发运行任务实例的数量。

Example:

t1 = BaseOperator(pool='my_custom_pool', max_active_tis_per_dag=12)

指定的选项贯穿整个 Airflow 设置:

  • core.parallelism:整个 Airflow 安装中运行的最大任务数
  • core.dag_concurrency:每个 DAG 可以运行的最大任务数(跨多个DAG runs)
  • core.non_pooled_task_slot_count:分配给不在池中运行的任务的任务槽数
  • core.max_active_runs_per_dag:活动 DAG 的最大数量runs, 每个 DAG
  • scheduler.max_threads:调度程序进程应使用多少个线程来调度 DAG
  • celery.worker_concurrency:worker 一次处理的最大任务实例数如果使用 CeleryExecutor
  • celery.sync_parallelism:CeleryExecutor 用于同步任务状态的进程数
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何控制 Airflow 安装的并行性或并发性? 的相关文章