设置自定义编码器和处理参数化类型

2023-12-02

我有两个与数据流管道面临的编码器问题相关的问题。

  • 如何为我的自定义数据类型设置编码器?该类仅包含三个项目 - 两个双精度值和另一个参数化属性。我尝试使用 SerializedCoder 注释类型,但最终仍然出现错误“com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException:无法根据类接口 java.util.Set 的值提供编码器:没有注册 CoderFactory为了班级。”该集实际上包含参数化的自定义数据类型 - 所以我假设自定义数据类型是问题所在。我找不到足够的文档/示例来说明执行此操作的正确方法。如果有的话,请指出正确的地方。
  • 即使没有自定义数据类型,每当我尝试切换到 Transform 函数的参数化版本时,都会导致编码器错误。具体来说,在参数化的复杂转换中,ParDo 使用参数化类型,但是当我在 ParDo 之后对生成的 PCollection 应用 Combine.PerKey 时,会导致 CoderNotFoundException。

关于这两个项目的任何帮助都会有所帮助,因为我现在已经坚持了一段时间了。


看起来你已经被两个问题困扰了。感谢您引起我们的注意!幸运的是,在我们改进的同时,有一些简单的解决方法可以解决这两个问题。

第一个问题是默认编码器注册表没有用于映射的条目Set.class to SetCoder。我们已备案GitHub 问题 #56跟踪其分辨率。同时,您可以使用以下代码来执行所需的注册:

pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);

第二个问题是参数化类型目前需要在编码器注册表中进行高级处理,因此@DefaultCoder不会受到尊重。我们已备案Github 问题 #57来跟踪这个。确保的最好方法是SerializableCoder到处都用于CustomType就是注册一个CoderFactory对于您的类型,将返回SerializableCoder。假设你的类型是这样的:

public class CustomType<T extends Serializable> implements Serializable {
  T field;
}

然后下面的代码注册了一个CoderFactory从而产生适当的SerializableCoder实例:

pipeline.getCoderRegistry().registerCoder(CustomType.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? extends Coder<?>>) {
    // No matter what the T is, return SerializableCoder
    return SerializableCoder.of(CustomType.class);
  }

  @Override
  public List<Object> getInstanceComponents(Object value) {
    // Return the T inside your CustomType<T> to enable coder inference for Create
    return Collections.singletonList(((CustomType<Object>) value).field);
  }
});

现在,每当您使用CustomType在您的管道中,编码器注册表将生成一个SerializableCoder.

注意SerializableCoder is not 确定性的(编码对象的字节不一定等于编码对象的字节)equals())因此使用此编码器编码的值不能用作GroupByKey手术。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

设置自定义编码器和处理参数化类型 的相关文章

随机推荐