EDIT
在较新的 Kafka 客户端中,实施Serializer
而不是Encoder
.
编写自定义序列化器所需的内容是:
- Implement
Encoder
with an object specified for the generic
- 供应一个
VerifiableProperties
需要构造函数
- 覆盖
toBytes(...)
确保返回字节数组的方法
- 将序列化器类注入
ProducerConfig
为生产者声明自定义序列化器
正如您在问题中指出的那样,Kafka 提供了一种为生产者声明特定序列化器的方法。序列化器类设置在ProducerConfig
实例并且该实例用于构造所需的Producer
class.
如果你关注Kafka 的 Producer 示例你将构建ProducerConfig
via a Properties
目的。构建属性文件时,请务必包括:
props.put("serializer.class", "path.to.your.CustomSerializer");
使用您希望 Kafka 在将消息附加到日志之前用于序列化消息的类的路径。
创建 Kafka 理解的自定义序列化器
编写 Kafka 可以正确解释的自定义序列化程序需要实现Encoder[T]
Kafka 提供的 scala 类。在java中实现traits很奇怪,但以下方法适用于在我的项目中序列化 JSON:
public class JsonEncoder implements Encoder<Object> {
private static final Logger logger = Logger.getLogger(JsonEncoder.class);
// instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
private static final ObjectMapper objectMapper = new ObjectMapper();
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
你的问题听起来像是你正在使用一个对象(让我们称之为CustomMessage
) 对于附加到日志中的所有消息。如果是这种情况,您的序列化器可能看起来更像这样:
package com.project.serializer;
public class CustomMessageEncoder implements Encoder<CustomMessage> {
public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(CustomMessage customMessage) {
return customMessage.toBytes();
}
}
这将使您的属性配置看起来像这样:
props.put("serializer.class", "path.to.your.CustomSerializer");