Apache Beam -> BigQuery - 用于重复数据删除的 insertId 不起作用

2023-12-10

我使用 apache beam 和 google dataflow runner 将数据从 kafka 流式传输到 BigQuery。 我想利用 insertId 进行重复数据删除,我在谷歌文档中找到了描述。但即使插入是在几秒钟之内发生的,我仍然看到很多具有相同 insertId 的行。 现在我想知道也许我没有正确使用 API 来利用 BQ 提供的流式插入的重复数据删除机制。

我在beam中编写的代码如下所示:

payments.apply("Write Fx Payments to BQ", BigQueryIO.<FxPayment>write()
            .withFormatFunction(ps -> FxTableRowConverter.convertFxPaymentToTableRow(ps))
            .to(bqTradePaymentTable)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

除了所有其他字段之外,我还直接在 FxTableRowConverter.convertFxPaymentToTableRow 方法中的 TableRow 上设置 insertId,并将其作为格式函数传递给 BigQueryIO:

row.set("insertId", insertId);

我还将该字段作为一列添加到 BQ 中。没有它,插入就会失败(显然)。 除了将 insertId 添加到 TableRow 对象之外,我找不到任何其他方法可以直接在 BigQueryIO 上设置 insertId 。

这是使用它的正确方法吗?因为它对我不起作用,所以我看到了很多重复,即使我不应该看到,因为就像我已经提到的那样,插入在几秒钟内发生。 BigQuery 文档指出流缓冲区将 insertId 保留至少一分钟。


您无法在 Dataflow 中手动指定 BigQuery 流式传输的 insertIdhttps://stackoverflow.com/a/54193825/1580227

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Beam -> BigQuery - 用于重复数据删除的 insertId 不起作用 的相关文章

随机推荐

  • ImageField image_width 和 image_height 自动填充现有数据库?

    我有一个现有的数据库表ImageField已经有一堆现有的注册表 我想添加image width and image height在我的模型上 但我需要一些东西来自动填充现有的表格 class ItemImage models Model
  • 滚动时粘性导航栏?

    我目前正在为自己构建一个网站 我发现在多个网站上都有这种非常棒的效果 其中导航栏位于图像下方 但当您滚动经过它时 它会粘在顶部 Example 我怎样才能实现这个目标 另外 如何才能达到类似的效果该网站具有类似的导航栏样式 这是 Matth
  • 为什么设置断点可以让我的代码工作?

    我很新C所以我确信我做错了很多事情 但这让我感到困惑 我的代码应该从用户那里获取标题 并在路由目录中创建一个具有该名称的文件夹 仅当我在上面设置断点时它才有效makeFolder 执行 由于某种原因 在我点击之前稍作休息continue让它
  • Perl + POO 和 Mysql 错误

    我刚刚学了 poo 我开始使用 perl 实现了这一点 但我没有得到预期的输出 mysql 有问题吗 还是坏代码 另外 相同的查询在控制台和工作台上运行 并且此模块添加 chmod x module pm usr bin perl use
  • 如何从外部链接导航到 Bootstrap 4 的特定选项卡

    我想从外部页面链接打开 bootstrap 4 的特定选项卡 第 1 页 nav html 我想从这一页转到第二页 a href index html tab 1 tab 1 a a href index html tab 2 tab 2
  • 调整 QWidget 大小时忽略最小尺寸

    有没有办法让 QWidget 及其任何子类 完全忽略其最小大小 我想要的是 QPushButton 在尺寸太小时时被切断 而不是阻止窗口调整大小 默认行为 您可以使用 button setSizePolicy QSizePolicy Ign
  • 使用 selenium python 使用复合类解析 HTML 内容

    我的 GUI 中有一个显示按钮 用于显示连接状态 带有绿色勾号的按钮表示已建立连接 带有红色叉号的按钮表示没有连接 我必须使用我的代码检查状态 我正在解析该特定标题栏类名 容器流体 的内容 由此 我正在解析该显示按钮的显式内容 elem d
  • 如何从 SQLite 数据库中检索用户位置特定范围内的一组位置

    我有一些位置坐标存储在我的SQLite数据库表 我想检索距用户当前位置 1 公里范围内的位置 现在我正在从数据库中获取所有值 并编写了一个方法来检索我范围内的值 这给我带来了巨大的开销 因为我的表可能包含超过 1000 个坐标 所以我正在寻
  • EF 6:另一个复杂类型中的嵌套复杂类型

    假设我有一个名为 car 的实体 我使用复杂类型来定义 引擎 部分 TableName T CAR public sealed class Car IEngine public EngineType EngineType get set C
  • 使用 Angular js 进行应用程序设计

    一直致力于使用 Angular JS 构建一个巨大的应用程序 在同一主题上看到了太多关于如何最好地设计应用程序的问题 但仍然感到困惑 一个基本的应用程序通常有一个 1 登录页面 索引页面 2 主页 带页眉和页脚 身体是partial com
  • 将两个数据框导出为一个 Excel 文件,并在指定位置的 pandas 中包含两张表

    我有两个数据框 如下所示 df1 Date t factor plan plan score 0 2020 02 01 5 NaN 0 1 2020 02 02 23 NaN 0 2 2020 02 03 14 start 0 3 2020
  • Javascript如何获取所选项目的ID

    我正在使用 Web2Py 将值列表发送到我的视图 我的 HTML 是这样的
  • Excel VBA 计算另一张工作表中的公式

    已解决 问题出在我的公式中 我使用 INDIRECT 引用单元格 当工作表不同时 该单元格不起作用 查看答案 我在一张纸上有一个公式 我想要做的是使用另一张纸上的公式 使用 eval 来评估该公式 然而 结果并不如预期 该公式似乎使用工作表
  • 使用 Chrome 查找 JavaScript 内存泄漏

    我创建了一个非常简单的测试用例 它创建一个 Backbone 视图 将处理程序附加到事件 并实例化一个用户定义的类 我相信 通过单击此示例中的 删除 按钮 所有内容都会被清理 并且不会出现内存泄漏 代码的 jsfiddle 在这里 http
  • 使用单例进行单元测试

    我已经使用 Visual Studio Team Edition 测试框架准备了一些自动测试 我希望其中一项测试按照程序中完成的正常方式连接到数据库 string r providerName ConfigurationManager Co
  • Delphi 字典和排序数据

    我的代码是 procedure TfrmSettings btnFillDictClick Sender TObject var Dict TDictionary
  • 在 Node.js 中复制 Java 密码哈希代码 (PBKDF2WithHmacSHA1)

    编辑 我的问题已更新 请查看这篇文章的底部以了解最新一期 我把剩下的留给那些想阅读整个故事的人 我一直致力于将一个小型 Java 应用程序转换为 Node js 大部分进展顺利 我必须查找大量 Java 函数来弄清楚它们的作用以及如何在 N
  • 获取mysql查询中一行的排名

    我使用此查询根据每个名字获得的票数为他们分配排名 但它返回错误 1248 每个派生表必须有自己的别名 这是我的代码 SELECT rownum rownum 1 AS rank name vote FROM table SELECT row
  • getJSON 不支持 async:false

    我下面有这段代码 它应该返回调用的结果 我需要同步执行此操作 以便我知道一切都很好 但它似乎不起作用 我究竟做错了什么 jQuery library http code jquery com jquery 1 9 1 min js func
  • Apache Beam -> BigQuery - 用于重复数据删除的 insertId 不起作用

    我使用 apache beam 和 google dataflow runner 将数据从 kafka 流式传输到 BigQuery 我想利用 insertId 进行重复数据删除 我在谷歌文档中找到了描述 但即使插入是在几秒钟之内发生的 我