根据有关的文档卡夫卡java文档 if I:
应该进行重新平衡,这使得消费者可以阅读该新主题。但这并没有发生。
如果我停止并启动消费者,它确实会获取新主题。所以我知道新主题符合模式。这个问题可能有重复https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics但这个问题没有任何结果。
我看到 kafka 日志,没有错误,只是没有触发重新平衡。当消费者加入或死亡时会触发重新平衡,但在创建新主题时不会触发(即使将分区添加到现有主题时也不会触发,但这是另一个主题)。
我正在使用 kafka 0.10.0.0 和“New Consumer API”的官方 Java 客户端,这意味着代理 GroupCoordinator 而不是胖客户端 + Zookeeper。
这是示例消费者的代码:
public class SampleConsumer {
public static void main(String[] args) throws IOException {
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.setProperty("group.id", "my-group");
System.out.println(properties.get("group.id"));
consumer = new KafkaConsumer<>(properties);
}
Pattern pattern = Pattern.compile("mytopic.+");
consumer.subscribe(pattern, new SampleRebalanceListener());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("%s %s\n", record.topic(), record.value());
}
}
}
}
在生产者中,我将消息发送到名为 mytopic1、mytopic2 等的主题。
如果不触发重新平衡,模式几乎毫无用处。
您知道为什么重新平衡没有发生吗?
该文档提到“模式匹配将定期针对检查时存在的主题进行。”。事实证明,“定期”对应于metadata.max.age.ms 属性。通过将该属性(在我的代码示例中的“consumer.props”内)设置为 5000,我可以看到它每 5 秒检测一次新主题和分区。
根据这个 jira 票证,这是设计的https://issues.apache.org/jira/browse/KAFKA-3854:
JIRA 上的最后一条注释指出,后来创建的与消费者订阅模式相匹配的主题在创建时不会分配给消费者,这似乎是按照设计的。需要对相同模式重复 subscribe() 来处理这种情况。
刷新元数据轮询会执行票证中提到的“重复 subscribe()”。
这是来自 Kafka 0.8 的令人困惑的事情,其中真正的触发是基于 Zookeper 手表,而不是轮询。 IMO 0.9 更多的是针对这种情况的降级,而不是“及时”重新平衡,这变成了要么有开销的高频轮询,要么在对新主题/分区做出反应之前需要很长时间的低频轮询。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)