将字符串列表作为 Airflow 中相关任务的参数传递

2023-11-23

我正在尝试通过以下方式将字符串列表从一个任务传递到另一个任务XCom但我似乎无法将推送列表解释回列表。

例如,当我在某些函数中执行此操作时blah这是运行在ShortCircuitOperator:

paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)

然后我想使用这样的列表作为运算符的参数。例如,

run_task_after_blah = AfterBlahOperator(
    task_id='run-task-after-blah',
    ...,
    input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
    ...,
)

我预计input_paths等于paths但事实并非如此,因为渲染首先发生,然后分配,并且模板渲染在某种程度上将转换xcom_pull返回到一个字符串化的列表(以及此后我的AfterBlahOperatorinserts 将其指定为 JSON 中元素的值。

我尝试连接paths分成由某个分隔符分隔的一个字符串,并将其推送到 XCom,然后在从 XCom 拉出时将其拆分回来,但当 XCom 首先渲染时,我得到,要么字符串化的列出当split函数在模板或原始连接字符串内调用paths if the split函数应用于参数(如"{{ ti.xcom_pull(task_ids='find-paths') }}".split(';').

当提取的值可以进一步处理时,XCom 似乎非常适合作为任务参数的单个值或多个值,但不适用于将 multiple_values 转换为“一个”作为任务参数。

有没有一种方法可以做到这一点,而不必编写一个额外的函数来精确返回这样的字符串列表? 或者也许我滥用了 XCom 太多,但 Airflow 中有许多运算符将元素列表作为参数(例如,通常是多个文件的完整路径,这些文件是某些先前任务的结果,因此事先不知道)。


Jinja 渲染字符串,因此如果您通过模板获取 XCom,它始终是一个字符串。相反,您需要获取您有权访问的 XComTaskInstance目的。像这样的东西:

class AfterBlahOperator(BaseOperator):

    def __init__(self, ..., input_task_id, *args, **kwargs):
        ...
        self.input_task_id = input_task_id
        super(AfterBlahOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id)
        for path in input_paths:
            ...

这类似于您在 a 中获取它的方式PythonOperator,其中XCom 文档提供一个例子。

请注意,您仍然可以支持单独的input_paths当参数可以在 DAG 中硬编码时,您只需要进行额外检查即可确定从哪个参数读取值。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将字符串列表作为 Airflow 中相关任务的参数传递 的相关文章

  • Pandas:将增量数字添加到一列的重复值的后缀,这些重复值按另一列的值分组并按索引排序

    我试图将下划线和增量数字添加到按索引排序的任何重复值以及由另一列定义的组内 例如 我希望 化学 列中的重复值具有下划线和增量数字 并按索引排序并按 循环 列分组 df pd DataFrame 1 1 1 1 1 1 2 2 2 2 2 2
  • Python Pandas 滚动聚合一列列表

    我有一个简单的数据框 df 和一列列表lists 我想根据以下内容生成一个附加列lists The df好像 import pandas as pd lists 1 1 2 1 2 3 3 2 9 7 9 4 2 7 3 5 create
  • 导入错误:无法导入名称“FFProbe”

    我无法获取ffprobe包 https github com simonh10 ffprobe在 Python 3 6 中工作 我使用 pip 安装它 但是当我输入import ffprobe it says Traceback most
  • 如何同时运行多个功能[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有以下代码 my func1 my func2 my func3 my func4 my func5 是否可以同时计算函数的数据 而
  • Python 不考虑 distutils.cfg

    我已经尝试了给出的所有内容 并且所有教程都指向相同的方向 即使用 mingw 作为 python 而不是 Visual C 中的编译器 我确实有 Visual C 和 mingw 当我想使用 pip 安装时 问题开始出现 它总是给Unabl
  • Python:json_normalize pandas 系列给出 TypeError

    我在 pandas 系列中有数万行像这样的 json 片段df json IDs lotId 1 Id 123456 date 2009 04 17 bidsCount 2 IDs lotId 2 Id 123456 date 2009 0
  • Python3将模块从文件夹导入到另一个文件夹

    我的结构字典是 mainFolder folder1 init py file1 py file2 py folder2 init py file3 py file4 py setup py init py 我需要将 file4 py 从f
  • 如何用函数记录一个文件?

    我有一个带有函数 lib py 但没有类的python 文件 每个函数都有以下样式 def fnc1 a b c This fonction does something param a lalala type a str param b
  • Python speedtest.net,或等效的[关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 是否有一个 Python 库可以实现 SpeedTest net 测试或等效的互联网连接速度测试 GitHub上有一个项目叫速度检查 https gi
  • 在 Python 中从 Excel 复制 YEARFRAC() 函数

    因此 我使用 python 来自动执行一些必须在 Excel 中执行的重复任务 我需要做的计算之一需要使用yearfrac 这在Python中被复制了吗 I found this https lists oasis open org arc
  • 我可以用关闭的文件对象做什么?

    当您打开文件时 它存储在一个打开的文件对象中 该对象使您可以访问该文件的各种方法 例如读取或写入 gt gt gt f open file0 gt gt gt f
  • 如何从 python 脚本执行 7zip 命令

    我试图了解如何使用 os system 模块来执行 7zip 命令 现在我不想用 Popen 或 subprocess 让事情变得复杂 我已经安装了 7zip 并将 7zip exe 复制到我的用户文件夹中 我只想提取我的测试文件 inst
  • Python脚本从字母和两个字母组合生成单词

    我正在编写一个简短的脚本 它允许我使用我设置的参数生成所有可能的字母组合 例如 b a 参数 单词 5 个字母 第三 第五个字母 b a 第一个字母 ph sd nn mm 或 gh 第二 第四个字母 任意元音 aeiouy 和 rc 换句
  • 如何将 URL 添加到 Telegram Bot 的 InlineKeyboardButton

    我想制作一个按钮 可以从 Telegram 聊天中在浏览器中打开 URL 外部超链接 目前 我只开发了可点击的操作按钮 update message reply text Subscribe to us on Facebook and Te
  • 如何创建增量加载网页

    我正在编写一个处理大量数据的页面 它会永远持续到我的结果页面加载 几乎无限 因为返回的数据太大了 因此 我需要实现一个增量加载页面 例如 url 中的页面 http docs python org http docs python org
  • AWS 将 MQTT 消息存储到 DynamoDB

    我构建了一个定期发送 MQTT 消息的 python 脚本 这是发送到后端的 JSON 字符串 Id 1234 Ut 1488395951 Temp 22 86 Rh 48 24 在后端 我想将 MQTT 消息存储到 DynamoDB 表中
  • SQLAlchemy 与 count、group_by 和 order_by 使用 ORM

    我有几个函数需要使用 count group by 和 order by 进行一对多连接 我使用 sqlalchemy select 函数生成一个查询 该查询将返回一组 id 然后我对其进行迭代以对各个记录执行 ORM 选择 我想知道是否有
  • PyObjC + Python 3.0 问题

    默认情况下 Cocoa Python 应用程序使用默认的 Python 运行时版本 2 5 如何配置我的 Xcode 项目以便它使用较新的 Python 3 0 运行时 我尝试用新版本替换项目中包含的Python framework 但它不
  • Tkinter 将鼠标点击绑定到框架

    我一定错过了一些明显的东西 我的 Tkinter 程序中有两个框架 每个框架在网格布局中都有一堆标签 我想将鼠标点击绑定到其中一个而不是另一个 我目前使用 root bind
  • 使用 python 将 CSV 文件上传到 Microsoft Azure 存储帐户

    我正在尝试上传一个 csv使用 python 将文件写入 Microsoft Azure 存储帐户 我已经发现C sharp https blogs msdn microsoft com jmstall 2012 08 03 convert

随机推荐

  • 在 OCUnit 中使用核心数据类时出现 Apple Mach-O 链接器错误

    好的 这是我的测试类中的代码 NSManagedObjectContext managedObjectContextWithConcurrencyType NSManagedObjectContextConcurrencyType conc
  • 什么是 CSRF 代币?它的重要性是什么?它是如何运作的?

    我正在编写一个应用程序 Django 确实如此 我只想了解 CSRF 令牌 实际上是什么以及它如何保护数据 如果不使用CSRF token 发布的数据会不安全吗 简单来说 跨站请求伪造 CSRF 假设您当前已登录网上银行 www myban
  • 在 MySql 中将 VARCHAR 转换为 DECIMAL 值

    我已将包含字符串值 例如吃 和浮动值 例如 0 87 的 CSV 文件导入到我的 phpMyAdmin 数据库中的表中 在处理完所有字符串值并仅保留具有十进制值的行后 我需要将这些值从 VARCHAR 转换为 DECIMAL FLOAT 以
  • 定点的反平方根

    我正在寻找定点 16 16 数字的最佳反平方根算法 下面的代码是我到目前为止所拥有的 但基本上它取平方根并除以原始数字 我想得到不除法的倒数平方根 如果它发生任何改变 代码将为armv5te编译 uint32 t INVSQRT uint3
  • 为什么使用 static_cast(x) 而不是 (T)x?

    我听说static cast函数应该优先于 C 风格或简单函数风格的转换 这是真的 为什么 主要原因是经典的 C 类型转换不区分我们所说的static cast lt gt reinterpret cast lt gt const cast
  • 确定为特定函数调用分派哪个方法

    我试图理解一些我没有编写的代码 plot gam在 mgcv 中 并且有一个调用plot 函数带有一些我不认识的奇怪参数 例如 P 我想弄清楚这次调用正在调度哪个绘图方法 findMethod 和类似的功能没有帮助 我认为情节是S3 我尝试
  • php shell_exec 权限被拒绝

    shell exec touch Users Nerses Downloads ads txt 2 gt 1 我的 PHP exec shel exec 函数有问题 它说我没有执行该命令的权限 怎样才能打开这些权限呢 您的 PHP 代码正在
  • sql 2008 中没有索引的表列表

    如何列出 SQL 2008 数据库中没有索引的表 Edit我想要架构名称和表名称 这应该涵盖您正在寻找的内容 即堆表 无聚集索引 并且没有任何非聚集索引 它使用新的系统 2005 2008 年使用的表对象 此外 您可能想要查找具有聚集索引但
  • 添加属性时,如何保留 .NET 程序集的 COM 二进制兼容性?

    我们开发了一个 NET 程序集来存储语言翻译信息 并且需要由 VB6 应用程序使用 我们希望能够更改翻译信息而无需重新编译应用程序 翻译由名为 LanguageServices 的两个文件部分类提供 一个文件是不变的库方法 另一个文件是从
  • 如何在Cypress.io(电子/铬)中设置浏览器语言? [复制]

    这个问题在这里已经有答案了 我的问题是关于配置Cypress以某种语言启动浏览器实例 为了 对本地化 i18n 文本标签进行断言 检查 i18n 功能 在语言之间切换 绕过持续集成 CI CD 的问题 例如 本地计算机 浏览器默认为fr F
  • 如何将 Xcode 降级到以前的版本?

    我偶尔需要使用 Xcode 现在遇到一个问题 我已经升级到 Xcode 4 6 但我使用的另一个软件不支持它 所以我需要返回到 Xcode 4 5 我不习惯 Mac 的一般工作方式 因此如果所提供的答案可以在编写时考虑到这一点 那将会很有帮
  • Resharper Intellisense 可以配置为按字母顺序排序吗?

    我最近更新为使用 VS2013 和 Resharper 8 2 从 VS2010 和 Resharper 6 并发现了一个非常烦人的 feature 因为 Resharper 的智能感知不会按字母顺序对类成员进行排序 举例来说 我有一个名为
  • .NET 中的自定义文化感知日期格式

    在 NET 中 用于格式化 DateTime 值的大多数标准字符串都是区域性感知的 例如 ShortDatePattern d 格式字符串根据当前区域性切换年 月 日部分的顺序 6 15 2009 1 45 30 PM gt 6 15 20
  • PHP 作为 FastCGI 应用程序运行 (php-cgi) - 如何发出并发请求?

    编辑 更新 向下滚动 编辑2 更新 问题已解决 Some background information 我正在用 Java 编写自己的网络服务器 几天前我询问 Apache 与 PHP 的接口到底如何 这样我就可以实现 PHP 支持 我了解
  • .NET 中工作线程和 I/O 线程的简单描述

    在 NET 中很难找到工作线程和 I O 线程的详细但简单的描述 我对这个主题的了解很清楚 但技术上可能不准确 工作线程是这样的线程should使用 CPU 来完成工作 I O 线程 也称为 完成端口线程 should使用设备驱动程序来完成
  • 如何在 Numpy/MatplotLib 中可视化线性规划(具有任意不等式)的可行区域?

    我需要实现线性规划问题的求解器 所有限制都是 5x 10y 这些限制可以是任意数量的 另外 x gt 0 y gt 0 隐式 我需要找到最佳解决方案 最大值 并在 matplotlib 中显示可行区域 我通过实施单纯形法找到了最佳解决方案
  • Unwind Segue 在 Swift 3 和 iOS 10 中不起作用

    我正在尝试在 iOS 10 和 Swift 3 中测试 unwind segue I made a simple app like this 我添加了 segue 的代码TableViewController类并连接 取消 按钮并退出表视图
  • 从文本文件中删除^M字符的Shell命令[重复]

    这个问题在这里已经有答案了 可能的重复 删除 Unix 中的回车符 我正在读取外部第三方生成的一些数据 我注意到文件中的 ASCII 文本中散布着 M 字符 我认为这是 ASCII 中的字符 13 表示不带换行符的回车符 是否有一个行可以用
  • 如何在android中使用SharedPreference存储图像?

    我想使用 SharedPreference 在 android 中保存图像 我有两个活动类 当我单击第一个活动的按钮时 它将调用第二个活动 第二个活动在列表视图中显示我的首选名称 并将 Android 壁纸重置为我设置为首选壁纸的图像在第一
  • 将字符串列表作为 Airflow 中相关任务的参数传递

    我正在尝试通过以下方式将字符串列表从一个任务传递到另一个任务XCom但我似乎无法将推送列表解释回列表 例如 当我在某些函数中执行此操作时blah这是运行在ShortCircuitOperator paths gs format bucket