Google Dataflow(Apache Beam)JdbcIO批量插入mysql数据库

2024-01-28

我正在使用 Dataflow SDK 2.X Java API ( Apache Beam SDK) 将数据写入 mysql。我创建了基于管道Apache Beam SDK 文档 https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html使用dataflow将数据写入mysql。它一次插入单行,因为我需要实现批量插入。我在官方文档中没有找到任何启​​用批量插入模式的选项。

想知道是否可以在数据流管道中设置批量插入模式?如果是,请让我知道我需要在下面的代码中更改什么。

 .apply(JdbcIO.<KV<Integer, String>>write()
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
          .withUsername("username")
          .withPassword("password"))
      .withStatement("insert into Person values(?, ?)")
      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
        public void setParameters(KV<Integer, String> element, PreparedStatement query) {
          query.setInt(1, kv.getKey());
          query.setString(2, kv.getValue());
        }
      })

编辑 2018-01-27:

事实证明,这个问题与DirectRunner有关。如果您使用 DataflowRunner 运行相同的管道,您应该获得实际上最多 1,000 条记录的批次。 DirectRunner 在分组操作后始终创建大小为 1 的包。


原答案:

使用 Apache Beam 的 JdbcIO 写入云数据库时,我遇到了同样的问题。问题是,虽然 JdbcIO 确实支持批量写入最多 1,000 条记录,但我从未真正见过它一次写入超过 1 行(我必须承认:这总是在开发环境中使用 DirectRunner)。

因此,我向 JdbcIO 添加了一项功能,您可以通过将数据分组在一起并将每个组作为一个批次写入来自行控制批次的大小。下面是基于 Apache Beam 原始 WordCount 示例的如何使用此功能的示例。

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

与JdbcIO普通写入方法的区别在于新的方法writeIterable()这需要一个PCollection<Iterable<RowT>>作为输入而不是PCollection<RowT>。每个 Iterable 都作为一批写入数据库。

添加此内容的 JdbcIO 版本可以在此处找到:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java

包含上述示例的整个示例项目可以在这里找到:https://github.com/olavloite/spanner-beam-example https://github.com/olavloite/spanner-beam-example

(Apache Beam 上还有一个待处理的拉取请求,希望将其包含在项目中)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Google Dataflow(Apache Beam)JdbcIO批量插入mysql数据库 的相关文章

  • 通过货币换算获取每种产品类型的最低价格

    我想选择每种产品类型中最便宜的 包括运费 价格转换为当地货币 最便宜 产品 价格 产品 运费 seller to aud 我的数据库有如下表 PRODUCTS SELLERS id type id seller id price shipp
  • 在docker中使用MySQL数据库设置aspnetcore

    我正在尝试设置一个 docker compose 文件 其中包含 asp net core mysql 数据库和 phpmyadmin 的容器 设置我的 mysql 服务器没有问题 我可以使用 phpmyadmin 访问它 我的 asp n
  • MySQL 服务器未启动

    当我做 mysql u root p并输入my password这就是我得到的 错误 2002 HY000 无法通过套接字 var run mysqld mysqld sock 连接到本地 MySQL 服务器 2 所以我输入 systemc
  • PHP 选择后立即删除

    我有一个 PHP 服务器脚本 它从 MySQL 数据库中选择一些数据 一旦我将 mysql query 和 mysql fetch assoc 的结果存储在我自己的局部变量中 我就想删除我刚刚选择的行 这种方法的问题在于 PHP 似乎对我的
  • MySQL 存储过程将值分配给 select 语句中的多个变量

    这是我的存储过程 我在为声明的变量赋值时遇到问题 当我执行它时 插入和更新命令工作正常 但声明变量的值保持为 0 但我在数据库中有一些价值 我怎样才能正确地做到这一点 BEGIN DECLARE PaidFee INT DEFAULT 0
  • 在 MacOSX10.6 上运行 python 服务器时 MySQLdb 错误

    运行我的服务器 python manage py runserver 产生以下错误 django core exceptions ImproperlyConfigured 加载 MySQLdb 模块时出错 没有名为 MySQLdb 的模块
  • 比较表中的行以了解字段之间的差异

    我有一个包含 20 多列的表 客户端 其中大部分是历史数据 就像是 id clientID field1 field2 etc updateDate 如果我的数据如下所示 10 12 A A 2009 03 01 11 12 A B 200
  • Mysql innoDB 不断崩溃[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我的数据库 mysql 服务器不断崩溃 重新启动 我不知道该怎么办 我不断在 dbname org err 文件中收到以下内容 13120
  • 当sql连接中存在两个同名列时,如何从一个表列中获取值

    当我连接两个具有相同名称列的表时 我目前面临着尝试获取值的问题 例如 table1 date和table2 date 每个表中的日期不同 我将如何获取 日期 本例中的表1 我目前正在跑步 while row mysqliquery gt f
  • 截断 Mysql 表 Cron 作业?

    我在如何使用 cron 作业截断 Mysql 表时遇到了一些麻烦 无论我尝试什么 我似乎都无法让数据库清除表格 感谢您的帮助 mysql uderp example pexample hlocalhost Dexample e TRUNCA
  • MySQL Connector/C++ 库链接错误问题

    PROBLEM 好吧 我一直在尝试遵循 MySQL Forge Wiki 和其他一些网站上的示例代码 这些网站提供了有关如何获得简单数据库连接的教程 但由于某种原因 我的项目总是因链接错误而失败 我可以我自己不明白为什么或如何解决它 我仍在
  • Mysql 检索所有有限制的行

    我想检索特定用户的所有行 限制为 0 x 所以我只是想问是否有任何方法可以检索 mysql 中的所有行 而不调用返回 x 的 count id 的方法 而不重载现有函数 该函数在查询中根本没有限制 与我们的 string Relace 功能
  • MYSQL 区分大小写的 utf8 搜索(使用 hibernate)

    我的登录表具有 utf8 字符集和 utf8 排序规则 当我想要检查用户名并检索该特定用户名的其他信息时 hql 查询会为我提供小写和大写相同的结果 我应该如何处理适用于案例的 HQL 查询 我使用 Mysql 5 和 java hiber
  • MySQL MIN/MAX 所有行

    我有桌子Races与行ID Name and TotalCP 我选择分钟 TotalCP FROM Races 但是我想选择具有最小值的整行 我如何在单个查询中做到这一点 从聚合值获取整行的一般形式是 SELECT FROM Races W
  • 将古吉拉特语文本插入 MySQL 表会产生垃圾字符和不可读的文本

    我有三个 MySQL 表 我正在向其中插入古吉拉特语内容 当我插入两个表时 它们插入得很好并且可读 但在一个表中 它显示垃圾字符 不可读的文本 我怎样才能解决这个问题 MySQL 有每个表的字符集设置 http dev mysql com
  • MySQL 查询计算上个月

    我想计算上个月的订单总额 我收到了从当前日期获取当月数据的查询 SELECT SUM goods total AS Total Amount FROM orders WHERE order placed date gt date sub c
  • 将第三个表链接到多对多关联中的桥接表

    设计这个数据库的正确方法是什么 这是我设置表格的方式 我在名为 教师 的表和名为 仪器 的表之间存在多对多关系 然后我有一个连接两者的桥接表 我想将另一个表与 BRIDGE 表关联起来 意思是乐器 老师的组合 该表有 3 行 指定老师可以教
  • MySQL 查询到 CSV [重复]

    这个问题在这里已经有答案了 有没有一种简单的方法来运行MySQL查询来自linux命令行并以csv格式输出结果 这就是我现在正在做的事情 mysql u uid ppwd D dbname lt lt EOQ sed e s g tee l
  • 映射 mysql 中同一个表的多个值

    您好 我必须使用另一个表中的值 id 获取文本值 表 1 包含值 ID 表 2 包含名称和值 ID 表 1 SEVERITY OCCURENCE DETECTABILITY 2 3 4 表 2 id name value 1 Very Hi
  • 在 MySQL 中存储表情符号的编码问题:如何使用 Prisma ORM 在 NodeJS 中定义字符排序规则?

    亲爱的 Nodejs 专家和数据库专家 我们在 MySQL 数据库中存储表情符号和其他特殊字符时遇到问题 我们使用 Prisma 得到一个错误 这是我们使用的 ORM 参数无法从排序规则 utf8 general ci 转换为 utf8mb

随机推荐

  • 通过搜索文档找到最小片段的算法?

    我一直在阅读斯基纳 Skiena 出色的 算法设计手册 并沉迷于其中的一个练习 问题是 给定一个由三个单词组成的搜索字符串 找到包含所有三个搜索单词的文档的最小片段 即其中单词数量最少的片段 您将获得这些单词在搜索字符串中出现的索引位置 例
  • 如何在 postgresql 中找到死元组的大小?

    如何在 postgresql 中找到死元组的大小 我已经使用 pg dump 创建了数据库备份并将其恢复到其他服务器上 我发现两个数据库中的数据库大小存在差异 5 GB 我已经验证了表中的活元组和死元组 由于当前数据库中添加了新数据 存在行
  • 如何从子进程(通过 Parallel::ForkManager 分叉)传递变量?

    我的查询 在下面的代码中我试图打印 commandoutput 0 被转移或传递到即将到来的子程序中 我尝试通过转移来传递它 但我失败了 你能帮我正确的方法吗 Code my max forks 4 createThreads my com
  • 从具有开始/结束日期的行创建年份序列行的数据框

    我对 R 和编码来说是一个相对较新的用户 我已经搜索过但无法解决这个问题 我有以下数据 groupid start date end date Status 1 2014 01 01 2017 01 01 A 1 2018 01 01 20
  • Python - 检查字母是否在列表中

    如果一个字母 字符串 在列表中 find letter o 你好 c 再见 返回 True 否则返回 False def find letter lst lst o hello 1 n o if not lst return 0 elif
  • 响应式背景图像 bootstrap 3

    我正在使用 bootstrap 并试图使我的背景图像具有响应能力 但它不起作用 这是我的代码 html div class row div class bg img src img home bg jpg alt home backgrou
  • `Class of` 类型声明的含义是什么?

    在查看我的代码之一时 我陷入了如下的一项声明 TMyObjectClass class of TMyObject 我有点困惑 想知道这句话的含义是什么 作为TMyObjectClass声明之上没有任何声明 and TMyObject声明如下
  • ElasticSearch:未分配的分片,如何修复?

    我有一个有4个节点的ES集群 number of replicas 1 search01 master false data false search02 master true data true search03 master fals
  • 按下按钮刷新 tkinter 框架

    我正在使用来自的代码在 tkinter 中的两个框架之间切换 https stackoverflow com questions 7546050 switch between two frames in tkinter制作我的图形用户界面
  • 在 Swift 中将参数传递给选择器

    我正在构建一个应用程序来跟踪大学课程的阅读作业 每个 ReadingAssignment 都包含一个 Bool 值 指示读者是否已完成阅读作业 ReadingAssignments 被收集到 WeeklyAssignment 数组中 我希望
  • 具有相同标签的不同散点图标记

    我遇到了 类似 的问题Matplotlib 一个标签具有多个不同标记的图例 https stackoverflow com questions 9262307 matplotlib legend with multiple differen
  • 多个域的集成 Windows 身份验证

    我有一个针对域 A 用户的 Asp net 网站 具有集成 Windows 身份验证 现在B域的用户需要访问该网站 但域B用户访问网站时会弹出窗口输入验证信息 我的问题是 如何配置IIS或Windows Server 让B域用户像A域用户一
  • XAML 文件的智能感知 (Xamarin.Forms)

    在 Visual Studio 15 中 如何让 XAML Intellisense 适用于 Xamarin Forms 中的 XAML 文件 编辑 将答案与问题分开 这个解决方案是在这个论坛 https social msdn micro
  • 如何检测 Angular 中属性的更改

    我有一个带有子组件的组件timeline
  • $null 应该位于相等比较的左侧吗? (-eq 与数组)

    与同事讨论 应该 null是在支票的左边还是右边 有什么例子可以说明为什么这很重要吗 abc null null eq abc True abc eq null True All ok abc 6 7 null 8 9 null eq ab
  • 使用 NPOI 将图像插入 Excel 文件

    我正在使用 C 在 Visual Studio 2010 中编写一个程序 并且正在使用 NPOI 库 我正在尝试将图像插入到 Excel 文件中 我尝试了两种不同的方法 但都不起作用 Method 1 HSSFPatriarch patri
  • 服务重启后 Docker 节点宕机

    我的服务器似乎空间不足 并且某些已部署的 Docker 堆栈出现了一些问题 我花了一段时间才弄清楚 但最终我做到了 并删除了一些容器和图像以释放一些空间 我能够跑service docker restart它起作用了 然而 也存在一些问题
  • 通过java应用程序发送附有excel文件的电子邮件 - 不起作用

    我试图通过Java应用程序发送一封邮件 其中包含excel文件作为附件 而不实际创建该文件 excel文件中的数据来自数据库 我可以发送带有附件的邮件 但文件是文本 制表符分隔 格式 但我希望该文件仅为 Excel 格式 请帮忙 以下是代码
  • 在 Java 调试器中,如何忽略从未通过我的代码的异常

    我目前正在使用 IntelliJ IDEA 进行 Java 开发 但我也对针对其他 IDE 的答案或调试 Java 代码的一般概念感兴趣 因为我在许多 IDE 中都错过了这个功能 所以我不确定在从其他语言转移我的调试习惯时是否错过了工作流程
  • Google Dataflow(Apache Beam)JdbcIO批量插入mysql数据库

    我正在使用 Dataflow SDK 2 X Java API Apache Beam SDK 将数据写入 mysql 我创建了基于管道Apache Beam SDK 文档 https beam apache org documentati