看起来你已经被两个问题困扰了。感谢您引起我们的注意!幸运的是,在我们改进的同时,有一些简单的解决方法可以解决这两个问题。
第一个问题是默认编码器注册表没有用于映射的条目Set.class
to SetCoder
。我们已备案GitHub 问题 #56跟踪其分辨率。同时,您可以使用以下代码来执行所需的注册:
pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
第二个问题是参数化类型目前需要在编码器注册表中进行高级处理,因此@DefaultCoder
不会受到尊重。我们已备案Github 问题 #57来跟踪这个。确保的最好方法是SerializableCoder
到处都用于CustomType
就是注册一个CoderFactory对于您的类型,将返回SerializableCoder
。假设你的类型是这样的:
public class CustomType<T extends Serializable> implements Serializable {
T field;
}
然后下面的代码注册了一个CoderFactory从而产生适当的SerializableCoder
实例:
pipeline.getCoderRegistry().registerCoder(CustomType.class, new CoderFactory() {
@Override
public Coder<?> create(List<? extends Coder<?>>) {
// No matter what the T is, return SerializableCoder
return SerializableCoder.of(CustomType.class);
}
@Override
public List<Object> getInstanceComponents(Object value) {
// Return the T inside your CustomType<T> to enable coder inference for Create
return Collections.singletonList(((CustomType<Object>) value).field);
}
});
现在,每当您使用CustomType
在您的管道中,编码器注册表将生成一个SerializableCoder
.
注意SerializableCoder
is not 确定性的(编码对象的字节不一定等于编码对象的字节)equals()
)因此使用此编码器编码的值不能用作GroupByKey
手术。