我正在尝试使用 Apache Beam 的重复数据删除功能对来自 Google Cloud Pubsub 的输入消息进行重复数据删除。但是,我创建后遇到错误KV<String, MyModel>
配对并将其传递给Deduplicate
转换。
Error:
ParDo requires a deterministic key coder in order to use state and timers
Code:
PCollection<KV<String, MyModel>> deduplicatedEvents =
messages
.apply(
"CreateKVPairs",
ParDo.of(
new DoFn<MyModel, KV<String, MyModel>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().getUniqueKey(),c.element()));
}
}))
.apply(
"Deduplicate",
Deduplicate.<KV<String, MyModel>>values());
我应该如何创建可以将字符串编码/解码为密钥的确定性编码器才能使其工作?
任何输入都会非常有帮助。
The Deduplicate
转换的工作原理是将整个元素放入键中,然后执行键分组操作(在本例中是有状态的 ParDo)。由于 Beam 与语言无关,因此按键分组是使用元素的编码形式完成的。编码为相同字节的两个元素是“相等的”,而编码为不同字节的两个元素是“不相等的”。
A 确定性编码器是一个关于语言(如 Java)中的相等性如何与 Beam 相等性相关的概念。这意味着如果两个Java对象根据Java相等equals()
那么它们必须具有相同的编码字节。对于字符串、数字、数组等简单数据,这很容易。思考是什么造就了一名编码员是有帮助的non-确定性。例如,当编码两个Map
实例,它们可能是equals()
在 Java 级别,但键值对以不同的顺序编码,这使得它们对于 Beam 来说不相等。
如果您有一个不确定性编码器MyModel
, then Deduplicate
将无法正常工作,并且最终会得到重复项,因为 Beam 认为不同编码的对象是不平等的。
自动获得高质量确定性编码器的最简单方法可能是利用 Beam 的模式推断:https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types。您需要确保所有字段都可以确定性地编码。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)