当在只有一个 Kafka 代理的测试设置中启动我们的 Kafka Streams 应用程序时,我们大约在 15 次运行中看到以下错误:
org.apache.kafka.streams.errors.StreamsException: Existing internal topic alarm-message-streams-by-organization-repartition has invalid partitions: expected: 32; actual: 12. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.
当我们看到上面的错误时,实际分区数有所不同(预期为 32,实际高于 0 且低于 32)。
我们正在执行org.apache.kafka.streams.KafkaStreams#cleanUp
打电话之前org.apache.kafka.streams.KafkaStreams#start
。 Kafka 代理在没有数据的情况下启动(使用https://hub.docker.com/r/wurstmeister/kafka/ https://hub.docker.com/r/wurstmeister/kafka/)对于每次测试运行。
当查看 Kafka 代理的日志时,我们看到以下内容:
2018-10-22 18:41:31,373] INFO Topic creation Map(
alarm-message-streams-by-organization-repartition-19 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-22 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-0 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-7 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-23 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-1 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-24 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-2 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-30 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-5 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-21 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-8 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-14 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-15 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-6 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-16 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-31 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-25 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-9 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-20 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-29 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-13 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-26 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-17 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-4 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-10 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-3 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-11 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-12 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-28 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-27 -> ArrayBuffer(42),
alarm-message-streams-by-organization-repartition-18 -> ArrayBuffer(42)
) (kafka.zk.AdminZkClient)
看起来该主题是使用预期的分区数 (32) 创建的。后来,在同一个日志中,似乎有再次创建主题的请求。我们不知道为什么会发生这种情况,但至少请求仍然包含预期的分区数量 (32):
[2018-10-22 18:43:29,851] INFO [Admin Manager on Broker 42]: Error processing create topic request for topic alarm-message-streams-by-organization-repartition with arguments (numPartitions=32, replicationFactor=1, replicasAssignments={}, configs={cleanup.policy=delete, segment.bytes=52428800, segment.ms=600000, retention.ms=9223372036854775807, segment.index.bytes=52428800}) (kafka.server.AdminManager)
org.apache.kafka.common.errors.TopicExistsException: Topic 'alarm-message-streams-by-organization-repartition' already exists.
在我们使用 6 个 Kafka 代理运行的非测试中,我们从未见过这种情况发生。但是,我们运行的测试运行次数明显多于部署到非测试的次数。
注意:导致错误的主题并不总是相同。
该错误导致我们的测试设置不稳定,因此我们想了解它发生的原因并进行处理。有人可以提供一些关于 Kafka Streams 行为的见解吗?
我们正在使用 Kafka 和 Kafka Streams 2.0.0。