从 Beam 管道连接 google cloud sql postgres 实例

2023-12-06

我想从在谷歌数据流上运行的 apache beam 管道连接谷歌云 sql postgres 实例。
我想使用 Python SDK 来完成此操作。
我无法为此找到适当的文档。
在云SQL如何指导我没有看到任何数据流文档。
https://cloud.google.com/sql/docs/postgres/

有人可以提供文档链接/github 示例吗?


您可以使用关系数据库.Write and 关系数据库.读取转变自梁块如下:

首先安装beam-nuggets:

pip install beam-nuggets

供阅读:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
    )
    records = p | "Reading records from db" >> relational_db.Read(
        source_config=source_config,
        table_name='months',
    )
    records | 'Writing to stdout' >> beam.Map(print)

对于写作:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    months = p | "Reading month records" >> beam.Create([
        {'name': 'Jan', 'num': 1},
        {'name': 'Feb', 'num': 2},
    ])
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
        create_if_missing=True,
    )
    table_config = relational_db.TableConfiguration(
        name='months',
        create_if_missing=True
    )
    months | 'Writing to DB' >> relational_db.Write(
        source_config=source_config,
        table_config=table_config
    )
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Beam 管道连接 google cloud sql postgres 实例 的相关文章

随机推荐

  • 使用 C# 正则表达式转换字符串中的大小写

    我怎样才能转换这个字符串 bKk 035A paint House V003 to BKK 035a paint House v003 使用正则表达式 例如 Regex Replace 此正则表达式匹配字符串
  • 如何获取HttpWebResponseMessage的内容

    我有一个 asp net MVC razor C 应用程序 它有 1 个控制器和 1 个接受参数的 POST 函数 该函数返回一个 HttpResponseMessage public class VersionController Con
  • 手动清理项目内的 obj 文件安全吗?

    我的目标文件包含调试 发布文件夹和一些其他文件 我的目标文件大约有 1GB 手动清洁安全吗 如果我在项目中单击 清理 它不会删除调试文件夹中的所有内容 这就是为什么我想手动制作 是的 你可以清除里面的所有东西bin and obj文件夹 它
  • 苹果应用程序因 IPV6 崩溃而被拒绝

    我使用 Dotnet 的网络服务创建了一个 ios 应用程序 并于 1 个月前成功启动 我将我的应用程序与网络服务同步连接 现在我已经更改了应用程序 应用程序正在与网络服务异步连接 并将应 用程序放在应用程序商店中 但苹果拒绝了该应用程序
  • 如何在 Tkinter 中添加占位符

    我如何将占位符添加到条目中tkinter 我不相信它有像 HTML 这样的占位符功能 我发现要使文本在单击时消失 您必须添加一个onclick事件 但我如何创建onclick事件以及如何让文本首先出现 这是我正在使用的代码 我想说 在此处输
  • 什么是运算符的结合性以及为什么它很重要?

    对于运算符来说什么是结合性以及为什么它很重要 对于运算符来说 关联性意味着当同一运算符出现在一行中时 我们首先应用哪个运算符出现 下面 让Q成为操作员 a Q b Q c If Q是左结合的 那么它的计算结果为 a Q b Q c 如果它是
  • getElementsByTagName() 方法未按预期工作

    我试图简单地更改所有内容中的文本具有此代码的元素 p p
  • 需要在java中以特定时间间隔运行特定功能

    我有一个具体的要求 我有一个通知功能 可以将电子邮件发送到所有系统客户端 代码是用java写的 我想做的是 我想执行特定的工作 以特定的时间间隔发送电子邮件 例如每两天一次或每周或每月一次 截至目前我不知道如何做到这一点 我们正在使用 Qu
  • 解决 android 中的 java.lang.Throwable 异常

    我正在开发一个带有 sqlite 数据库的 Android 应用程序 在我的应用程序中编写与数据库操作相关的代码后 我遇到了以下异常 虽然我的应用程序没有停止 崩溃 但我想知道如何解决此异常 我不确定为什么会发生这个异常 但我认为它与 sq
  • C中不使用信号量的数据同步

    我需要在我的代码中进行数据同步 目前 我正在访问中断内以及本地函数中的全局值 如果中断调用频繁 则可能会损坏数据 我需要避免这种情况 我的代码中没有使用操作系统 因此无法使用信号量 使用与信号量类似的锁定方法可能会解决我的问题 任何帮助 将
  • 存储 PBKDF2 加密密码时使用什么数据类型?

    我正在使用 SimpleCrypto Net 来加密我的密码 据我所知 它使用 PBKDF2 和指定的盐和迭代次数 我想知道在数据库中设置密码列最适合我的数据类型是什么 从代码来看 结果是一个 64 字节的 Base64 编码密钥 考虑到
  • 在同一个azure web应用程序上创建虚拟目录

    我有一个在 Azure 上运行的应用程序 我需要复制该应用程序 以便不同的用户可以访问例如 site com s1 和 site com s2 并单独使用它们 应用程序是一样的 数据库会变 我尝试为我的应用程序创建一个虚拟目录 但每次都会遇
  • 检查窗口打开

    var windowUrl var windowName mywin var w window open windowUrl windowName windowSize w document write html w document cl
  • 使用不同的.Net 语言?

    是否有不同受欢迎程度的细分 Net 语言可用的 有谁知道任何提供此信息的调查 或者即使有可能确定这一点 Update 答案是not不同 Net 语言的列表 我希望看到显示每种 Net 语言的相对使用 流行程度的统计数据 谢谢 如果你像我一样
  • 访问受 Cloudflare 保护的网页

    首先 我想道歉 以防我的问题可能无法提供足够的连接或任何其他问题 我现在正在手机上输入此内容 因此 我正在开发一个项目 需要我在网页中自动执行任务 为了做到这一点 第一步是首先访问该页面 但我遇到了一个障碍 我尝试过搜索和想通了也无济于事
  • 如何使用 C# 迭代和统计 Word 文档中的修订次数?

    我一直在寻找这个问题 但找不到任何答案 所以希望这里的人可以提供帮助 我正在用 C 编写一个 WinForms 应用程序 其中我使用 Word Application Compare Documents 来比较两个文档并获取一个结果文档 其
  • shouldAutorotate To InterfaceOrientation:从未调用过

    我在 部署信息 下将支持的界面方向设置为除纵向倒置之外的所有方向 我想重写 shouldAutorotateToInterfaceOrientation 以实现自定义行为 即根据条件支持景观 由于限制 自定义视图转换 我只有一个视图控制器
  • 如何使用 AVAudioPlayer 在 iPhone sdk 中暂停和恢复同一首歌曲

    我想暂停这首歌 然后在 iPhone 中使用编程方式从该持续时间点继续播放 当我尝试暂停歌曲并再次开始播放我暂停的歌曲时 如何对其进行编码 是否有任何直接属性或建议任何代码可以解决我的问题 void playMusic path NSBun
  • 如何使用 Nuxt 2 转译 node_modules 中的依赖项?

    我读过有关转译的问题node modulesNuxt 但据说新的 Nuxt 2 已经解决了这个问题transpile选项中的nuxt config js file https nuxtjs org api configuration bui
  • 从 Beam 管道连接 google cloud sql postgres 实例

    我想从在谷歌数据流上运行的 apache beam 管道连接谷歌云 sql postgres 实例 我想使用 Python SDK 来完成此操作 我无法为此找到适当的文档 在云SQL如何指导我没有看到任何数据流文档 https cloud