我有一个用例,我读取存储在谷歌云存储中的换行 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用外部 API 来进行重复数据删除,无论该 json 元素之前是否被发现。我正在做一个ParDo
with a DoFn
在每个 json.
我还没有看到任何在线教程说明如何从 apache beam 调用外部 API 端点DoFn
数据流。
我在用着JAVA
Beam的SDK。我学习的一些教程解释说,使用startBundle
and FinishBundle
但我不清楚如何使用它
如果您需要检查外部存储中每个 JSON 记录的重复项,那么您仍然可以使用DoFn
为了那个原因。有几个注释,例如@Setup
, @StartBundle
, @FinishBundle
等,可用于注释您的方法DoFn
.
例如,如果您需要实例化客户端对象以将请求发送到外部数据库,那么您可能需要在@Setup
方法(如 POJO 构造函数),然后在您的@ProcessElement
method.
让我们考虑一个简单的例子:
static class MyDoFn extends DoFn<Record, Record> {
static transient MyClient client;
@Setup
public void setup() {
client = new MyClient("host");
}
@ProcessElement
public void processElement(ProcessContext c) {
// process your records
Record r = c.element();
// check record ID for duplicates
if (!client.isRecordExist(r.id()) {
c.output(r);
}
}
@Teardown
public void teardown() {
if (client != null) {
client.close();
client = null;
}
}
}
此外,为了避免对每条记录进行远程调用,您可以将捆绑记录批处理到内部缓冲区(将输入数据分束到捆绑中)并以批处理模式检查重复项(如果您的客户端支持此操作)。为此,您可以使用@StartBundle
and @FinishBundle
带注释的方法将在相应处理 Beam 束之前和之后调用。
对于更复杂的示例,我建议查看不同 Beam IO 中的 Sink 实现,例如运动IO https://github.com/apache/beam/blob/e65d457e8ccaddaf291de553bd17e8035ef7f43a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java#L603, 例如。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)