为了使其发挥作用,您必须定义您期望的字段Operator
as a template_field
。我做了这个工作示例:
class CustomDummyOperator(BaseOperator):
template_fields = ('msg_from_previous_task',)
@apply_defaults
def __init__(self,
msg_from_previous_task,
*args, **kwargs) -> None:
super(CustomDummyOperator, self).__init__(*args, **kwargs)
self.msg_from_previous_task = msg_from_previous_task
def execute(self, context):
print(f"Message: {self.msg_from_previous_task}")
DAG:
dag = DAG(
'xcom_arg_custom_op',
schedule_interval="@once",
start_date=days_ago(2),
default_args={'owner': 'airflow'},
tags=['example'],
catchup=False
)
def return_a_str():
return "string_value_from_op1"
task_1 = PythonOperator(
task_id='task_1',
dag=dag,
python_callable=return_a_str,
)
task_2 = CustomDummyOperator(
task_id='task_2',
dag=dag,
msg_from_previous_task=task_1.output
)
task_1 >> task_2
输出日志:
[2021-05-25 13:51:50,848] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xcom_arg_custom_op
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2021-05-23T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-05-23T00:00:00+00:00
Message: string_value_from_op1
在幕后我们正在使用str() https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/xcom_arg/index.html#airflow.models.xcom_arg.XComArg.__str__的方法XComArg
它为常规(“非任务流”) 运营商。
让我知道这是否对你有用!