我正在使用 Kafka 流,并且想要将一些消费者偏移量从 Java 重置到开头。KafkaConsumer.seekToBeginning(...)
听起来是正确的做法,但我使用 Kafka Streams:
KafkaStreams streams = new KafkaStreams(builder, props);
...
streams.start();
我想根据我定义的具体流管道,这会在后台创建多个消费者。我可以访问这些吗?或者还有其他方法可以以编程方式重置偏移量吗?
基于 Hans Jespersens 的回答,我成功地使用此代码完成了脚本在 Java 代码中执行的操作:
import kafka.tools.StreamsResetter;
StreamsResetter resetter = new StreamsResetter();
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER};
resetter.run(args);
该类是我使用以下命令导入到 Maven 中的 kafka 核心库的一部分:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)