如何在 Airflow 2.x 中将 XComArg 转换为字符串值?

2024-02-20

Code:

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.cloud.hooks.gcs import GCSHook


class GCSUploadOperator(BaseOperator):
    @apply_defaults
    def __init__(
        self,
        bucket_name,
        target_file_name,
        data_as_str,
        gcp_conn_id="google_cloud_default",
        *args,
        **kwargs,
    ):
        super(GCSUploadOperator, self).__init__(*args, **kwargs)
        self.bucket_name = bucket_name
        self.data_as_str = data_as_str
        self.gcp_conn_id = gcp_conn_id
        self.target_file_name = target_file_name

    def execute(self, context):
        hook = GCSHook(self.gcp_conn_id)
        hook.upload(
            bucket_name=self.bucket_name,
            object_name=context["execution_date"].strftime(
                f"year=2022/month=%m/day=%d/{self.target_file_name}"
            ),
            data=self.data_as_str,
        )

numbers = PythonOperator(task_id="numbers", python_callable=lambda: "abcde")
gcs = GCSUploadOperator(
    task_id="upload_content_to_GCS",
    bucket_name=BUCKET_NAME,
    target_file_name=f"{STORE_KEY_CONTENT}.json",
    data_as_str=?????????,   # I need to pass a string result of previous task
)

我尝试过的目的data_as_str:

    gcs = GCSUploadOperator(
        task_id="upload_content_to_GCS",
        bucket_name=BUCKET_NAME,
        target_file_name=f"{STORE_KEY_CONTENT}.json",
        data_as_str=numbers
    )
    --> TypeError: <Task(PythonOperator): numbers> could not be converted to bytes

    gcs = GCSUploadOperator(
        task_id="upload_content_to_GCS",
        bucket_name=BUCKET_NAME,
        target_file_name=f"{STORE_KEY_CONTENT}.json",
        data_as_str=numbers.output
    )
    --> TypeError: <airflow.models.xcom_arg.XComArg object at 0x7f6e8ed76760> could not be converted to bytes

任何想法?


为了使其发挥作用,您必须定义您期望的字段Operator as a template_field。我做了这个工作示例:


class CustomDummyOperator(BaseOperator):
    template_fields = ('msg_from_previous_task',)

    @apply_defaults
    def __init__(self,
                 msg_from_previous_task,
                 *args, **kwargs) -> None:
        super(CustomDummyOperator, self).__init__(*args, **kwargs)
        self.msg_from_previous_task = msg_from_previous_task

    def execute(self, context):
        print(f"Message: {self.msg_from_previous_task}")

DAG:

dag = DAG(
    'xcom_arg_custom_op',
    schedule_interval="@once",
    start_date=days_ago(2),
    default_args={'owner': 'airflow'},
    tags=['example'],
    catchup=False

)

def return_a_str():
    return "string_value_from_op1"

task_1 = PythonOperator(
    task_id='task_1',
    dag=dag,
    python_callable=return_a_str,
)

task_2 = CustomDummyOperator(
    task_id='task_2',
    dag=dag,
    msg_from_previous_task=task_1.output
)

task_1 >> task_2

输出日志:

[2021-05-25 13:51:50,848] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xcom_arg_custom_op
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2021-05-23T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-05-23T00:00:00+00:00
Message: string_value_from_op1

在幕后我们正在使用str() https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/xcom_arg/index.html#airflow.models.xcom_arg.XComArg.__str__的方法XComArg它为常规(“非任务流”) 运营商。

让我知道这是否对你有用!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Airflow 2.x 中将 XComArg 转换为字符串值? 的相关文章

随机推荐

  • UNIX 中“./”和“sh”的区别

    有时我发现很少有脚本是通过 sh 命令执行的 有时是通过 命令执行的 我无法理解它们之间的确切区别 请帮助我 sh file在新的 shell 进程中执行 shell 脚本文件 file在当前 shell 进程中执行 shell 脚本文件
  • 命令行终端上的乘法

    我正在使用串行终端为我们的实验室实验提供输入 我发现使用 echo 5X5 只返回一个字符串 5X5 有没有执行乘法运算的命令 是的 您可以使用bash 的内置算术扩展 https www gnu org software bash man
  • 如何解决“不支持关键字:‘元数据’”?

    我无法连接到 SQL Server 我的项目的连接字符串是
  • 使用图权重提升深度优先访问者最小生成树

    我想从具有边权重的顶点创建最小生成树 并以深度优先顺序遍历图 我可以构建图表和最小生成树 但我无法编写自定义访问者 include
  • WinHttpSendRequest 失败并显示 ERROR_WINHTTP_SECURE_FAILURE

    以编程方式与网络进行通信不是我的专业领域 但我设法通过从网上找到的示例中剪切和粘贴代码来创建 read web page 函数 并且该代码已经连续好几个月每天正常运行 碰巧的是 我工作时的主 Windows 10 电脑坏了 在等待维修时 我
  • PHP - 读取和修复大型无效 XML 文件

    我必须读取一些相当重的 XML 文件 200 MB 到 1 GB 之间 其中一些文件是无效的 让我举一个小例子
  • 为什么最终没有被调用?

    我有几个关于java中的垃圾收集器的问题 Q1 据我了解 当对象超出范围并且 JVM 即将收集垃圾时 finalize 就会被调用 我认为 Finalize 方法是由垃圾收集器自动调用的 但在这种情况下它似乎不起作用 解释是什么 为什么需要
  • ObjC Plist 文件读取比 JSON 快?

    我做过这个测试项目https github com danielpetroianu FileDeserializeBenchmarking https github com danielpetroianu FileDeserializeBe
  • jQuery 错误? .appendTo() 在 IE7 中不起作用

    我正在尝试为 jQuery 创建一个选项传输插件 我可以在 Opera Firefox Chrome 和 Safari 中使用基本功能 但 IE7 无法配合 IE7 中的传递函数的运行似乎非常零散且难以理解 我创造了一个示例页面来说明我的问
  • Three.JS - 粒子沿随机方向绕点运行形成球体

    我有一个粒子系统 其中所有粒子都位于相同的坐标处 并且在随机方向上一个接一个地 它们 应该 开始绕场景中心运行 形成一个球体 到目前为止 我成功实现的是一组 Vector3 对象 粒子 它们一个接一个地开始沿着 Z 轴绕中心运行 只需根据当
  • 将 bigint 转换为日期时间

    我想将一个值从 bigint 转换为 datetime 例如 我正在阅读HISTORY表的团队城市服务器 在场上构建启动时间服务器 我在一条记录 1283174502729 上有这个值 如何将其转换为日期时间值 这对你有用吗 它在 SQL
  • xsl string-join() 多个变量 - 仅使用非空

    我想创建几个 xsl variable 它们可能为空 也可能不为空 然后加入它们
  • BigQuery 中有自动增量吗?

    BigQuery 中是否有 AUTO INCRMENT SERIAL IDENTITY 或序列之类的内容 我知道 ROW NUMBERhttps cloud google com bigquery query reference row n
  • 如何快速检查是否使用 Perl 安装了 Linux `unzip`?

    如何快速检查是否是Linuxunzip是使用 Perl 安装的吗 which unzip 如果有输出 则它指向解压缩的位置 如果没有输出 则不会显示任何内容 这依赖于解压缩在您的路径上
  • UISegmentedControl setSelectedSegmentIndex:没有 valueChanged 操作

    我正在通过代码设置 UISegmentedControl 的 selectedSegmentIndex 每当我这样做时 就会调用 valueChanged 操作 这对我来说听起来很合乎逻辑 但是有没有办法在不调用操作的情况下设置选定的段 它
  • Powershell 更新失败

    当我跑步时Update Help它在 Powershell 中失败 我不通过代理 这是直接访问 我还以管理员身份运行 Powershell 我不知道还要检查什么 欢迎任何建议 这是我的版本 PSVersionTable Name Value
  • 如何确定 Windows/IIS 上的文件编码?

    从答案到这个问题 https stackoverflow com questions 2453647 why are accented characters rendering inconsistently when accessing t
  • 我如何显示提交做了什么?

    我知道的一个愚蠢的方法是 git diff commit number1 commit number2 有没有更好的办法 我的意思是 我想知道 commit1 本身 我不想在它之前添加 commit2 作为参数 git show
  • 将 WPF 控件设置为扩展以填充可用空间,仅此而已

    如何设置 WPF 控件来填充其父级容器中的可用空间 但不展开父级 以下代码片段描述了我正在尝试的布局 我想要Grid伸展以适应Expander 我想要ListBox只为了填补Grid 我想要ListBox的滚动条出现时Grid太小 无法显示
  • 如何在 Airflow 2.x 中将 XComArg 转换为字符串值?

    Code from airflow models import BaseOperator from airflow utils decorators import apply defaults from airflow providers