我在 Apache Flink 中使用 Kafka Connector 来访问由汇流卡夫卡.
除了 schema 注册表 url 之外ConfluentRegistryAvroDeserializationSchema.forGeneric(...)
期待“读者”模式。
我不想提供读取模式,而是想使用同一作者的模式(在注册表中查找)来读取消息,因为消费者不会有最新的模式。
FlinkKafkaConsumer010<GenericRecord> myConsumer =
new FlinkKafkaConsumer010<>("topic-name", ConfluentRegistryAvroDeserializationSchema.forGeneric(<reader schema goes here>, "http://host:port"), properties);
myConsumer.setStartFromLatest();
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html“使用这些反序列化模式记录将与从模式注册表检索并转换为静态提供的模式一起读取”
由于我不想在消费者端保留架构定义,如何使用编写者的架构反序列化来自 Kafka 的 Avro 消息?
感谢你的帮助!
我认为不能直接使用ConfluentRegistryAvroDeserializationSchema.forGeneric
。它旨在与读者模式一起使用,并且他们有对此进行检查的先决条件。
你必须实施你自己的。两个重要的东西:
- Set
specific.avro.reader
为 false(否则您将获得特定记录)
- The
KafkaAvroDeserializer
必须延迟初始化(因为它本身不可序列化,因为它保存对模式注册表客户端的引用)
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
public class KafkaGenericAvroDeserializationSchema
implements KeyedDeserializationSchema<GenericRecord> {
private final String registryUrl;
private transient KafkaAvroDeserializer inner;
public KafkaGenericAvroDeserializationSchema(String registryUrl) {
this.registryUrl = registryUrl;
}
@Override
public GenericRecord deserialize(
byte[] messageKey, byte[] message, String topic, int partition, long offset) {
checkInitialized();
return (GenericRecord) inner.deserialize(topic, message);
}
@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}
}
env.addSource(
new FlinkKafkaConsumer<>(
topic,
new KafkaGenericAvroDeserializationSchema(schemaReigstryUrl),
kafkaProperties));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)