如何使用 Cloud Composer 将大数据从 Postgres 导出到 S3?

2024-04-24

我一直在使用 Postgres to S3 运算符将数据从 Postgres 加载到 S3。但最近,我必须导出一个非常大的表,并且我的 Airflow Composer 失败,没有任何日志,这可能是因为我们正在使用 Python 临时文件模块的 NamedTemporaryFile 函数来创建临时文件,并且我们正在使用这个临时文件加载到 S3 。由于我们使用的是 Composer,因此这将被加载到 Composer 的本地内存中,并且由于文件的大小非常大,因此会失败。

参考这里:https://cloud.google.com/composer/docs/how-to/using/troubleshooting-dags#task_fails_without_emitting_logs https://cloud.google.com/composer/docs/how-to/using/troubleshooting-dags#task_fails_without_emitting_logs

我确实检查了 RedshiftToS3 运算符,因为它也使用 Postgres 钩子,并且它有几个可以轻松加载大文件的卸载选项,但我意识到 Redshift 和 Postgres 之间没有 1-1 对应关系。所以这是不可能的。有什么方法可以拆分我的 Postgres 查询吗?现在我正在做SELECT * FROM TABLENAME另外,我没有任何有关该表的信息。

我也遇到过这个类似的运算符:https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/sql_to_gcs.html https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/sql_to_gcs.html

这里有一个参数approx_max_file_size_bytes:

该运算符支持将大型表转储拆分为 多个文件(请参阅上面文件名参数文档中的注释)。这 param 允许开发人员指定分割的文件大小。

我从代码中了解到的是,当大小超过给定限制时,他们正在创建一个新的临时文件,那么他们是否会将文件拆分为多个临时文件,然后分别上传?

编辑: 我将再次准确地解释我想要做什么。目前,Postgres 到 S3 操作符会创建一个临时文件,并将游标返回的所有结果写入该文件,这会导致内存问题。所以我的想法是,我可以添加 max_file_size 限制,对于游标中的每一行,我将把结果写入临时文件,如果临时文件的大小超过我们设置的 max_file_size 限制,我们将写入我们的内容文件到 S3,然后刷新或删除该文件,然后创建一个新的临时文件并将光标的下一行写入该文件,并将该文件也上传到 S3。我不知道如何像这样修改运算符?


正如您已经发现的那样,这是因为您正在用表中的每一行构建一个字典,当表中有很多行时,机器上的内存就会耗尽。

您已经真正回答了自己的问题:仅写入 a 直到文件达到一定大小,然后将文件推送到 S3。或者,您可以将文件保留在磁盘上,并每 x 行刷新字典对象,但在这种情况下,您的文件可能会在磁盘上而不是在内存中变得非常大。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 Cloud Composer 将大数据从 Postgres 导出到 S3? 的相关文章

  • Matplotlib 标准化颜色条 (Python)

    我正在尝试使用 matplotlib 当然还有 numpy 绘制轮廓图 它有效 它绘制了它应该绘制的内容 但不幸的是我无法设置颜色条范围 问题是我有很多图 并且需要所有图都具有相同的颜色条 相同的最小值和最大值 相同的颜色 我复制并粘贴了在
  • 为什么我不能导入 geopandas?

    我唯一的代码行是 import geopandas 它给了我错误 OSError Could not find libspatialindex c library file 以前有人遇到过这个吗 我的脚本运行得很好 直到出现此错误 请注意
  • 如何在 Ubuntu 上安装 Python 模块

    我刚刚用Python写了一个函数 然后 我想将其做成模块并安装在我的 Ubuntu 11 04 上 这就是我所做的 创建 setup py 和 function py 文件 使用 Python2 7 setup py sdist 构建分发文
  • 如何更改充当按钮的范围的文本

    我正在为自定义 Web 应用程序编写自动化测试 我遇到了无法更改跨度文本的问题 我尝试过使用 driver execute script 但没有运气 如果我更好地了解 javascript 这确实会有帮助 据我所知 您无法单击跨度 并且列表
  • Dask DataFrame 的逐行处理

    我需要处理一个大文件并更改一些值 我想做这样的事情 for index row in dataFrame iterrows foo doSomeStuffWith row lol doOtherStuffWith row dataFrame
  • 如何自动替换多个文件的文本内容中的字符?

    我有一个文件夹 myfolder包含许多乳胶表 我需要替换其中每个字符 即替换任何minus sign by an en dash 只是为了确定 我们正在替换连字符INSIDE该文件夹中的所有 tex 文件 我不关心 tex 文件名 手动执
  • 在 python-docx 中搜索和替换

    我有一个包含以下字符串的文档 模板 你好 我的名字是鲍勃 鲍勃是一个很好的名字 我想使用 python docx 打开此文档并使用 查找和替换 方法 如果存在 来更改每个字符串 Bob gt Mark 最后 我想生成一个新文档 其中包含字符
  • 无法包含外部 pandas 文档 Pycharm v--2018.1.2

    我无法包含外部 pandas 文档Pycharm v 2018 1 2 例如 numpy gt http docs scipy org doc numpy reference generated module name element na
  • 将此 MySQL 查询转换为 PyGreSQL

    我正在开发一个 Ruby 应用程序 它使用 mysql 函数 XOR 和 BIT COUNT 不过 我现在需要在运行 PyGreSQL 的 Heroku 上运行该应用程序 我找不到任何可以帮助我的 PyGreSQL 文档 那么任何人都可以翻
  • 将 subprocess.Popen 的输出通过管道传输到文件

    我需要启动一些长时间运行的进程subprocess Popen 并希望拥有stdout and stderr从每个自动管道到单独的日志文件 每个进程将同时运行几分钟 我想要两个日志文件 stdout and stderr 每个进程当进程运行
  • VSCode pytest 测试发现失败

    Pytest 测试发现失败 用户界面指出 Test discovery error please check the configuration settings for the tests 输出窗口显示 Test Discovery fa
  • 如何在 Windows 上使用 Python 3.6 来安装 Python 2.7

    我想问一下如何使用pip install对于 Python 2 7 当我之前安装并使用 Python 3 6 时 我现在必须使用 Windows 上的 Python 版本 pip install 继续安装 Python 3 6 我需要使用以
  • Pandas:根据列名进行列的成对乘法

    我有以下数据框 gt gt gt df pd DataFrame ap1 X 1 2 3 4 as1 X 1 2 3 4 ap2 X 2 2 2 2 as2 X 3 3 3 3 gt gt gt df ap1 X as1 X ap2 X a
  • 使用 python 将文本发送到带有逗号分隔符的列

    如何使用分隔符 在 Excel 中将一列分成两列 并使用 python 命名标题 这是我的代码 import openpyxl w openpyxl load workbook DDdata xlsx active w active a a
  • 在pycharm中调试python代码

    这个问题类似于this https stackoverflow com questions 10240018 how to use pycharm to debug python script一 我正在尝试调试pyethapp https
  • Scrapy 蜘蛛无法工作

    由于到目前为止没有任何效果 我开始了一个新项目 python scrapy ctl py startproject Nu 我完全按照教程操作 创建了文件夹和一个新的蜘蛛 from scrapy contrib spiders import
  • Google App Engine 中的自定义身份验证

    有谁知道或知道我可以在哪里学习如何使用 Python 和 Google App Engine 创建自定义身份验证流程 我不想使用 Google 帐户进行身份验证 并且希望能够创建自己的用户 如果不是专门针对 Google App Engin
  • 从 dask 数据框中的日期时间序列获取年份和星期?

    如果我有一个 Pandas 数据框和一个日期时间类型的列 我可以按如下方式获取年份 df year df date dt year 对于 dask 数据框 这是行不通的 如果我先计算 像这样 df year df date compute
  • 将 Scikit-Learn OneHotEncoder 与 Pandas DataFrame 结合使用

    我正在尝试使用 Scikit Learn 的 OneHotEncoder 将 Pandas DataFrame 中包含字符串的列替换为 one hot 编码的等效项 我的下面的代码不起作用 from sklearn preprocessin
  • 将 docker-compose.yml 中的包安装到 docker 容器中

    我是 docker 和 docker compose 的初学者 我需要你的帮助 我正在使用 docker compose 制作 PHP NGINX PostgresQL symfony 开发环境 这里是 web image nginx 1

随机推荐