我使用的是 Airflow 2.0.0,我的任务在运行几秒钟或几分钟后偶尔会被“外部”终止。任务通常会成功运行(都是通过以下方式启动的手动任务)airflow tasks test ...
以及计划的 DAG 运行),所以我相信这与我的 DAG 代码无关。
当任务失败时,这似乎是任务日志中的关键错误:
{local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2020-12-20 11:26:11,448] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]>
[2020-12-20 11:26:11,473] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]>
[2020-12-20 11:26:11,473] {taskinstance.py:1017} INFO -
--------------------------------------------------------------------------------
[2020-12-20 11:26:11,473] {taskinstance.py:1018} INFO - Starting attempt 3 of 3
[2020-12-20 11:26:11,473] {taskinstance.py:1019} INFO -
--------------------------------------------------------------------------------
[2020-12-20 11:26:11,506] {taskinstance.py:1038} INFO - Executing <Task(PythonOperator): run_backupper> on 2020-12-19T02:00:00+00:00
[2020-12-20 11:26:11,509] {standard_task_runner.py:51} INFO - Started process 12059 to run task
[2020-12-20 11:26:11,515] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'daily_backups', 'run_backupper', '2020-12-19T02:00:00+00:00', '--job-id', '22', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/backupper/daily_backups.py', '--cfg-path', '/tmp/tmpnfmqtorg']
[2020-12-20 11:26:11,517] {standard_task_runner.py:76} INFO - Job 22: Subtask run_backupper
[2020-12-20 11:26:11,609] {logging_mixin.py:103} INFO - Running <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [running]> on host localhost
[2020-12-20 11:26:11,742] {taskinstance.py:1232} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=<user>
AIRFLOW_CTX_DAG_ID=daily_backups
AIRFLOW_CTX_TASK_ID=run_backupper
AIRFLOW_CTX_EXECUTION_DATE=2020-12-19T02:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2020-12-19T02:00:00+00:00
...
... my job's logs, indicating that the job is running healthily ...
...
[2020-12-20 11:26:16,587] {local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2020-12-20 11:26:16,593] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 12059
[2020-12-20 11:27:16,609] {process_utils.py:108} WARNING - process psutil.Process(pid=12059, name='airflow task runner: daily_backups run_backupper 2020-12-19T02:00:00+00:00 22', status='sleeping', started='11:26:11') did not respond to SIGTERM. Trying SIGKILL
[2020-12-20 11:27:16,618] {process_utils.py:61} INFO - Process psutil.Process(pid=12059, name='airflow task runner: daily_backups run_backupper 2020-12-19T02:00:00+00:00 22', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='11:26:11') (12059) terminated with exit code Negsignal.SIGKILL
[2020-12-20 11:27:16,618] {local_task_job.py:118} INFO - Task exited with return code Negsignal.SIGKILL
日志中的最后几行不一致。对于先前尝试中失败的同一任务,这是一个不同的版本:
... same stuff as before ...
[2020-12-20 02:01:12,689] {local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2020-12-20 02:01:12,695] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 24442
[2020-12-20 02:02:00,462] {taskinstance.py:1214} ERROR - Received SIGTERM. Terminating subprocesses.
[2020-12-20 02:02:00,498] {process_utils.py:61} INFO - Process psutil.Process(pid=24442, status='terminated', exitcode=0, started='02:00:10') (24442) terminated with exit code 0
[2020-12-20 02:02:00,499] {local_task_job.py:118} INFO - Task exited with return code 0
我怀疑在这种情况下,脚本能够及时响应 SIGTERM,而在前一种情况下,它在长时间运行的查询上被阻止,并且无法干净地终止。