我正在尝试在气流中运行 DAG 以将数据集摄取到谷歌云存储。
这是 DAG 脚本:
import os
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from google.cloud import storage
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator
PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
BUCKET = os.environ.get("GCP_GCS_BUCKET")
dataset_file = "yellow_tripdata_2021-01.parquet"
dataset_url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{dataset_file}"
path_to_local_home = os.environ.get("AIRFLOW_HOME", "/opt/airflow/")
BIGQUERY_DATASET = os.environ.get("BIGQUERY_DATASET", 'trips_data_all')
# NOTE: takes 20 mins, at an upload speed of 800kbps. Faster if your internet has a better upload speed
def upload_to_gcs(bucket, object_name, local_file):
"""
Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
:param bucket: GCS bucket name
:param object_name: target path & file-name
:param local_file: source path & file-name
:return:
"""
# WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed.
# (Ref: https://github.com/googleapis/python-storage/issues/74)
storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024 # 5 MB
storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024 # 5 MB
# End of Workaround
client = storage.Client()
bucket = client.bucket(bucket)
blob = bucket.blob(object_name)
blob.upload_from_filename(local_file)
default_args = {
"owner": "airflow",
"start_date": days_ago(1),
"depends_on_past": False,
"retries": 1,
}
# NOTE: DAG declaration - using a Context Manager (an implicit way)
with DAG(
dag_id="data_ingestion_gcs_dag",
schedule_interval="@daily",
default_args=default_args,
catchup=False,
max_active_runs=1,
tags=['dtc-de'],
) as dag:
download_dataset_task = BashOperator(
task_id="download_dataset_task",
bash_command=f"curl -sSL {dataset_url} > {path_to_local_home}/{dataset_file}"
)
local_to_gcs_task = PythonOperator(
task_id="local_to_gcs_task",
python_callable=upload_to_gcs,
op_kwargs={
"bucket": BUCKET,
"object_name": f"raw/{dataset_file}",
"local_file": f"{path_to_local_home}/{dataset_file}",
},
)
bigquery_external_table_task = BigQueryCreateExternalTableOperator(
task_id="bigquery_external_table_task",
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": BIGQUERY_DATASET,
"tableId": "external_table",
},
"externalDataConfiguration": {
"sourceFormat": "PARQUET",
"sourceUris": [f"gs://{BUCKET}/raw/{dataset_file}"],
},
},
)
download_dataset_task >> local_to_gcs_task >> bigquery_external_table_task
当我尝试触发它时,它卡在运行中,并且我总是收到以下日志:
*** Reading local file: /opt/airflow/logs/data_ingestion_gcs_dag/download_dataset_task/2022-09-28T11:15:38.340982+00:00/2.log
[2022-09-28, 11:31:29 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: data_ingestion_gcs_dag.download_dataset_task manual__2022-09-28T11:15:38.340982+00:00 [queued]>
[2022-09-28, 11:31:29 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: data_ingestion_gcs_dag.download_dataset_task manual__2022-09-28T11:15:38.340982+00:00 [queued]>
[2022-09-28, 11:31:29 UTC] {taskinstance.py:1238} INFO -
--------------------------------------------------------------------------------
[2022-09-28, 11:31:29 UTC] {taskinstance.py:1239} INFO - Starting attempt 2 of 3
[2022-09-28, 11:31:29 UTC] {taskinstance.py:1240} INFO -
--------------------------------------------------------------------------------
[2022-09-28, 11:31:29 UTC] {taskinstance.py:1259} INFO - Executing <Task(BashOperator): download_dataset_task> on 2022-09-28 11:15:38.340982+00:00
[2022-09-28, 11:31:29 UTC] {standard_task_runner.py:52} INFO - Started process 2532 to run task
[2022-09-28, 11:31:29 UTC] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'data_ingestion_gcs_dag', 'download_dataset_task', 'manual__2022-09-28T11:15:38.340982+00:00', '--job-id', '33', '--raw', '--subdir', 'DAGS_FOLDER/data_ingestion_gcs_dag.py', '--cfg-path', '/tmp/tmpxin03j36', '--error-file', '/tmp/tmp1zqihjcz']
[2022-09-28, 11:31:29 UTC] {standard_task_runner.py:77} INFO - Job 33: Subtask download_dataset_task
[2022-09-28, 11:31:29 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: data_ingestion_gcs_dag.download_dataset_task manual__2022-09-28T11:15:38.340982+00:00 [running]> on host b46ac29d5228
[2022-09-28, 11:31:29 UTC] {taskinstance.py:1700} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UndefinedColumn: column xcom.execution_date does not exist
LINE 1: ...g' AND xcom.task_id = 'download_dataset_task' AND xcom.execu...
^
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1329, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1415, in _execute_task_with_callbacks
self.clear_xcom_data()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 792, in clear_xcom_data
session=session,
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 323, in clear
return query.delete()
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3926, in delete
delete_op.exec_()
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
self._do_exec()
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1930, in _do_exec
self._execute_stmt(delete_stmt)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
return conn.execute(stmt, self._params)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column xcom.execution_date does not exist
LINE 1: ...g' AND xcom.task_id = 'download_dataset_task' AND xcom.execu...
^
[SQL: DELETE FROM xcom WHERE xcom.dag_id = %(dag_id_1)s AND xcom.task_id = %(task_id_1)s AND xcom.execution_date = %(execution_date_1)s]
[parameters: {'dag_id_1': 'data_ingestion_gcs_dag', 'task_id_1': 'download_dataset_task', 'execution_date_1': datetime.datetime(2022, 9, 28, 11, 15, 38, 340982, tzinfo=Timezone('UTC'))}]
(Background on this error at: http://sqlalche.me/e/13/f405)
[2022-09-28, 11:31:29 UTC] {taskinstance.py:1277} INFO - Marking task as UP_FOR_RETRY. dag_id=data_ingestion_gcs_dag, task_id=download_dataset_task, execution_date=20220928T111538, start_date=20220928T113129, end_date=20220928T113129
[2022-09-28, 11:31:29 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 33 for task download_dataset_task
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UndefinedColumn: column xcom.execution_date does not exist
LINE 1: ...g' AND xcom.task_id = 'download_dataset_task' AND xcom.execu...
^
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1329, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1415, in _execute_task_with_callbacks
self.clear_xcom_data()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 792, in clear_xcom_data
session=session,
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 323, in clear
return query.delete()
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3926, in delete
delete_op.exec_()
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
self._do_exec()
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1930, in _do_exec
self._execute_stmt(delete_stmt)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
return conn.execute(stmt, self._params)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column xcom.execution_date does not exist
LINE 1: ...g' AND xcom.task_id = 'download_dataset_task' AND xcom.execu...
^
[SQL: DELETE FROM xcom WHERE xcom.dag_id = %(dag_id_1)s AND xcom.task_id = %(task_id_1)s AND xcom.execution_date = %(execution_date_1)s]
[parameters: {'dag_id_1': 'data_ingestion_gcs_dag', 'task_id_1': 'download_dataset_task', 'execution_date_1': datetime.datetime(2022, 9, 28, 11, 15, 38, 340982, tzinfo=Timezone('UTC'))}]
(Background on this error at: http://sqlalche.me/e/13/f405)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UndefinedColumn: column "execution_date" of relation "task_fail" does not exist
LINE 1: INSERT INTO task_fail (task_id, dag_id, execution_date, star...
^
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task
error_file=args.error_file,
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1386, in _run_raw_task
self.handle_failure(e, test_mode, error_file=error_file, session=session)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1755, in handle_failure
session.flush()
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2540, in flush
self._flush(objects)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2682, in _flush
transaction.rollback(_capture_exception=True)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
with_traceback=exc_tb,
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2642, in _flush
flush_context.execute()
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1136, in _emit_insert_statements
statement, params
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column "execution_date" of relation "task_fail" does not exist
LINE 1: INSERT INTO task_fail (task_id, dag_id, execution_date, star...
^
[SQL: INSERT INTO task_fail (task_id, dag_id, execution_date, start_date, end_date, duration) VALUES (%(task_id)s, %(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s, %(duration)s) RETURNING task_fail.id]
[parameters: {'task_id': 'download_dataset_task', 'dag_id': 'data_ingestion_gcs_dag', 'execution_date': datetime.datetime(2022, 9, 28, 11, 15, 38, 340982, tzinfo=Timezone('UTC')), 'start_date': datetime.datetime(2022, 9, 28, 11, 31, 29, 633052, tzinfo=Timezone('UTC')), 'end_date': datetime.datetime(2022, 9, 28, 11, 31, 29, 734536, tzinfo=Timezone('UTC')), 'duration': 0}]
(Background on this error at: http://sqlalche.me/e/13/f405)
[2022-09-28, 11:31:29 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-09-28, 11:31:29 UTC] {taskinstance.py:1277} INFO - Marking task as UP_FOR_RETRY. dag_id=data_ingestion_gcs_dag, task_id=download_dataset_task, execution_date=20220928T111538, start_date=20220928T113129, end_date=20220928T113129
我已经尝试了我能想到的一切。我使用 docker 重建了气流图像。我尝试编辑代码但无济于事。我总是收到这个错误。
顺便说一句:我正在使用最新版本的气流。