我们有以下带有两个变压器的拓扑,每个变压器都使用持久状态存储:
kStreamBuilder.stream(inboundTopicName)
.transform(() -> new FirstTransformer(FIRST_STATE_STORE), FIRST_STATE_STORE)
.map((key, value) -> ...)
.transform(() -> new SecondTransformer(SECOND_STATE_STORE), SECOND_STATE_STORE)
.to(outboundTopicName);
和卡夫卡设置有auto.offset.reset: latest
。应用程序启动后,我看到创建了两个内部压缩主题(这是预期的):appId_inbound_firstStateStore-changelog
and appId_inbound_secondStateStore-changelog
我们的应用程序停机了两天,在我们再次启动应用程序后,从头开始重新处理特定分区的消息(但我们有多个分区)。
我知道对于版本 2 之前的 kafka 代理来说,提交的偏移量会存储大约 1 天,因此我们的偏移量应该通过保留来清理。但是如果我们使用的话为什么消息会从一开始就被重新处理auto.offset.reset: latest
?也许它在某种程度上与有状态操作或变更日志内部主题有关。
我看到以下日志(其中大多数都重复多次):
StoreChangelogReader Restoring task 0_55's state store firstStateStore from beginning of the changelog
Fetcher [Consumer clientId=xxx-restore-consumer, groupId=] Resetting offset for partition xxx-55 to offset 0
ConsumerCoordinator Setting newly assigned partitions
ConsumerCoordinator Revoking previously assigned partitions
StreamsPartitionAssignor Assigned tasks to clients
AbstractCoordinator Successfully joined group with generation
StreamThread partition revocation took xxx ms
Unsubscribed all topics or patterns and assigned partitions
AbstractCoordinator (Re-)joining group
Attempt to heartbeat failed since group is rebalancing
AbstractCoordinator Group coordinator xxx:9092 (id: xxx rack: null) is unavailable or invalid, will attempt rediscovery
FetchSessionHandler - [Consumer clientId=xxx-restore-consumer, groupId=] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: org.apache.kafka.common.errors.DisconnectException
卡夫卡经纪人版本0.11.0.2
;卡夫卡流版本2.1.0