Apache Beam:如何在使用重复数据删除功能时解决“ParDo 需要确定性密钥编码器才能使用状态和计时器”

2024-05-26

我正在尝试使用 Apache Beam 的重复数据删除功能对来自 Google Cloud Pubsub 的输入消息进行重复数据删除。但是,我创建后遇到错误KV<String, MyModel>配对并将其传递给Deduplicate转换。

Error:

ParDo requires a deterministic key coder in order to use state and timers

Code:

PCollection<KV<String, MyModel>> deduplicatedEvents =
    messages
        .apply(
            "CreateKVPairs",
            ParDo.of(
                new DoFn<MyModel, KV<String, MyModel>>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(KV.of(c.element().getUniqueKey(),c.element()));
                  }
                }))
        .apply(
            "Deduplicate",
            Deduplicate.<KV<String, MyModel>>values());

我应该如何创建可以将字符串编码/解码为密钥的确定性编码器才能使其工作?

任何输入都会非常有帮助。


The Deduplicate转换的工作原理是将整个元素放入键中,然后执行键分组操作(在本例中是有状态的 ParDo)。由于 Beam 与语言无关,因此按键分组是使用元素的编码形式完成的。编码为相同字节的两个元素是“相等的”,而编码为不同字节的两个元素是“不相等的”。

A 确定性编码器是一个关于语言(如 Java)中的相等性如何与 Beam 相等性相关的概念。这意味着如果两个Java对象根据Java相等equals()那么它们必须具有相同的编码字节。对于字符串、数字、数组等简单数据,这很容易。思考是什么造就了一名编码员是有帮助的non-确定性。例如,当编码两个Map实例,它们可能是equals()在 Java 级别,但键值对以不同的顺序编码,这使得它们对于 Beam 来说不相等。

如果您有一个不确定性编码器MyModel, then Deduplicate将无法正常工作,并且最终会得到重复项,因为 Beam 认为不同编码的对象是不平等的。

自动获得高质量确定性编码器的最简单方法可能是利用 Beam 的模式推断:https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types。您需要确保所有字段都可以确定性地编码。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Beam:如何在使用重复数据删除功能时解决“ParDo 需要确定性密钥编码器才能使用状态和计时器” 的相关文章

随机推荐