我使用 apache beam 和 google dataflow runner 将数据从 kafka 流式传输到 BigQuery。
我想利用 insertId 进行重复数据删除,我在谷歌文档中找到了描述。但即使插入是在几秒钟之内发生的,我仍然看到很多具有相同 insertId 的行。
现在我想知道也许我没有正确使用 API 来利用 BQ 提供的流式插入的重复数据删除机制。
我在beam中编写的代码如下所示:
payments.apply("Write Fx Payments to BQ", BigQueryIO.<FxPayment>write()
.withFormatFunction(ps -> FxTableRowConverter.convertFxPaymentToTableRow(ps))
.to(bqTradePaymentTable)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
除了所有其他字段之外,我还直接在 FxTableRowConverter.convertFxPaymentToTableRow 方法中的 TableRow 上设置 insertId,并将其作为格式函数传递给 BigQueryIO:
row.set("insertId", insertId);
我还将该字段作为一列添加到 BQ 中。没有它,插入就会失败(显然)。
除了将 insertId 添加到 TableRow 对象之外,我找不到任何其他方法可以直接在 BigQueryIO 上设置 insertId 。
这是使用它的正确方法吗?因为它对我不起作用,所以我看到了很多重复,即使我不应该看到,因为就像我已经提到的那样,插入在几秒钟内发生。 BigQuery 文档指出流缓冲区将 insertId 保留至少一分钟。
您无法在 Dataflow 中手动指定 BigQuery 流式传输的 insertIdhttps://stackoverflow.com/a/54193825/1580227
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)