在气流 1.8 上失败时如何重新启动 dag?

2023-11-27

With:

default_args = {
    ...
    'retries': 1,
    'retry_delay': timedelta (seconds = 1),
    ...
} 

我可以获取重试多次失败的任务,但是当任务失败,DAG重新启动时如何获取?

当然,自动...


您可以运行第二个“失败检查”DAG,以查询任何任务实例,其中task_id符合你想要的并且状态是failed使用provide_session实用程序。然后,您还需要选择清除下游任务并设置相关的状态DagRun to running.

from datetime import datetime, timedelta
from sqlalchemy import and_
import json

from airflow import DAG
from airflow.models import TaskInstance, DagRun
from airflow.utils.db import provide_session

from airflow.operators.python_operator import PythonOperator


default_args = {'start_date': datetime(2018, 6, 11),
                'retries': 2,
                'retry_delay': timedelta(minutes=2),
                'email': [],
                'email_on_failure': True}


dag = DAG('__RESET__FAILED_TASKS',
          default_args=default_args,
          schedule_interval='@daily',
          catchup=False
          )


@provide_session
def check_py(session=None, **kwargs):

    relevant_task_id = 'relevant_task_id'

    obj = (session
           .query(TaskInstance)
           .filter(and_(TaskInstance.task_id == relevant_task_id,
                        TaskInstance.state == 'failed'))
           .all())

    if obj is None:
        raise KeyError('No failed Task Instances of {} exist.'.format(relevant_task_id))
    else:
        # Clear the relevant tasks.
        (session
         .query(TaskInstance)
         .filter(and_(TaskInstance.task_id == relevant_task_id,
                      TaskInstance.state == 'failed'))
         .delete())

        # Clear downstream tasks and set relevant DAG state to RUNNING
        for _ in obj:
            _ = json.loads(_.val)

            # OPTIONAL: Clear downstream tasks in the specified Dag Run.
            for task in _['downstream_tasks']:
                (session
                 .query(TaskInstance)
                 .filter(and_(TaskInstance.task_id == task,
                              TaskInstance.dag_id == _['dag_id'],
                              TaskInstance.execution_date == datetime.strptime(_['ts'],
                                                                                "%Y-%m-%dT%H:%M:%S")))
                 .delete())

            # Set the Dag Run state to "running"
            dag_run = (session
                       .query(DagRun)
                       .filter(and_(DagRun.dag_id == _['dag_id'],
                                    DagRun.execution_date == datetime.strptime(_['ts'],
                                                                               "%Y-%m-%dT%H:%M:%S")))
                       .first())

            dag_run.set_state('running')

with dag:

    run_check = PythonOperator(task_id='run_check',
                               python_callable=check_py,
                               provide_context=True)

    run_check
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在气流 1.8 上失败时如何重新启动 dag? 的相关文章

随机推荐