我正在尝试通过以下方式将字符串列表从一个任务传递到另一个任务XCom但我似乎无法将推送列表解释回列表。
例如,当我在某些函数中执行此操作时blah
这是运行在ShortCircuitOperator
:
paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)
然后我想使用这样的列表作为运算符的参数。例如,
run_task_after_blah = AfterBlahOperator(
task_id='run-task-after-blah',
...,
input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
...,
)
我预计input_paths
等于paths
但事实并非如此,因为渲染首先发生,然后分配,并且模板渲染在某种程度上将转换xcom_pull
返回到一个字符串化的列表(以及此后我的AfterBlahOperator
inserts 将其指定为 JSON 中元素的值。
我尝试连接paths
分成由某个分隔符分隔的一个字符串,并将其推送到 XCom,然后在从 XCom 拉出时将其拆分回来,但当 XCom 首先渲染时,我得到,要么字符串化的列出当split
函数在模板或原始连接字符串内调用paths
if the split
函数应用于参数(如"{{ ti.xcom_pull(task_ids='find-paths') }}".split(';')
.
当提取的值可以进一步处理时,XCom 似乎非常适合作为任务参数的单个值或多个值,但不适用于将 multiple_values 转换为“一个”作为任务参数。
有没有一种方法可以做到这一点,而不必编写一个额外的函数来精确返回这样的字符串列表?
或者也许我滥用了 XCom 太多,但 Airflow 中有许多运算符将元素列表作为参数(例如,通常是多个文件的完整路径,这些文件是某些先前任务的结果,因此事先不知道)。