现有内部主题具有无效分区

2024-01-28

当在只有一个 Kafka 代理的测试设置中启动我们的 Kafka Streams 应用程序时,我们大约在 15 次运行中看到以下错误:

org.apache.kafka.streams.errors.StreamsException: Existing internal topic alarm-message-streams-by-organization-repartition has invalid partitions: expected: 32; actual: 12. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.

当我们看到上面的错误时,实际分区数有所不同(预期为 32,实际高于 0 且低于 32)。

我们正在执行org.apache.kafka.streams.KafkaStreams#cleanUp打电话之前org.apache.kafka.streams.KafkaStreams#start。 Kafka 代理在没有数据的情况下启动(使用https://hub.docker.com/r/wurstmeister/kafka/ https://hub.docker.com/r/wurstmeister/kafka/)对于每次测试运行。

当查看 Kafka 代理的日志时,我们看到以下内容:

2018-10-22 18:41:31,373] INFO Topic creation Map(
    alarm-message-streams-by-organization-repartition-19 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-22 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-0 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-7 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-23 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-1 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-24 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-2 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-30 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-5 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-21 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-8 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-14 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-15 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-6 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-16 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-31 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-25 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-9 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-20 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-29 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-13 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-26 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-17 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-4 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-10 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-3 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-11 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-12 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-28 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-27 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-18 -> ArrayBuffer(42)
) (kafka.zk.AdminZkClient)

看起来该主题是使用预期的分区数 (32) 创建的。后来,在同一个日志中,似乎有再次创建主题的请求。我们不知道为什么会发生这种情况,但至少请求仍然包含预期的分区数量 (32):

[2018-10-22 18:43:29,851] INFO [Admin Manager on Broker 42]: Error processing create topic request for topic alarm-message-streams-by-organization-repartition with arguments (numPartitions=32, replicationFactor=1, replicasAssignments={}, configs={cleanup.policy=delete, segment.bytes=52428800, segment.ms=600000, retention.ms=9223372036854775807, segment.index.bytes=52428800}) (kafka.server.AdminManager)
org.apache.kafka.common.errors.TopicExistsException: Topic 'alarm-message-streams-by-organization-repartition' already exists.

在我们使用 6 个 Kafka 代理运行的非测试中,我们从未见过这种情况发生。但是,我们运行的测试运行次数明显多于部署到非测试的次数。

注意:导致错误的主题并不总是相同。

该错误导致我们的测试设置不稳定,因此我们想了解它发生的原因并进行处理。有人可以提供一些关于 Kafka Streams 行为的见解吗?

我们正在使用 Kafka 和 Kafka Streams 2.0.0。


似乎从 Kafka 集群(即您的单个代理)收到了不完整/不正确的元数据。在启动时(或者更准确地说,在每次重新平衡中),Kafka Streams 检查是否存在具有预期分区数量的内部主题。如果主题不存在,则会创建它(这应该在应用程序的生存时间内只发生一次)。如果存在且具有正确数量的分区,则使用该主题。如果主题存在的分区数量不正确,则会引发您报告的异常。

Calling KafkaStreams#cleanup()这里不应该有任何影响。它不一样StreamResetter您可以通过以下方式致电bin/kafka-streams-application-reset.sh (cf. https://kafka.apache.org/20/documentation/streams/developer-guide/app-reset-tool.html https://kafka.apache.org/20/documentation/streams/developer-guide/app-reset-tool.html)

目前我不知道问题的根本原因是什么,即为什么 Kafka Streams 收到了不正确的主题元数据。希望这可以帮助。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

现有内部主题具有无效分区 的相关文章

  • Spring Cloud Kafka Streams 中的错误处理

    我正在使用 Spring Cloud Stream 和 Kafka Streams 假设我有一个处理器 它是一个将字符串的 KStream 转换为 CityProgrammes 的 KStream 的函数 它调用 API 按名称查找城市 并
  • 变更日志/重新分区主题的复制因子应该是多少

    我知道可以为 kafka 流配置复制因子这些内部主题 我们的应用程序用于复制因子为 3 的普通应用程序主题 但到目前为止我还没有为变更日志 重新分区主题配置复制因子 而我的假设是如果一个经纪人死亡 或由于某种原因领导者发生变化 kafka
  • 嵌入式Kafka:KTable+KTable leftJoin产生重复记录

    我来寻求神秘的知识 首先 我有两对主题 每对中的一个主题融入另一个主题 后面的主题形成两个KTable 用于KTable KTable leftJoin 问题是 当我向任一 KTable 生成一条记录时 leftJoin 会生成三个记录 我
  • 卡夫卡流 RoundRobinPartitioner

    我编写了一个kafka流代码 使用kafka 2 4 kafka客户端版本和kafka 2 2服务器版本 我的主题和内部主题有 50 个分区 我的 kafka 流代码具有 selectKey DSL 操作 并且我有 200 万条使用相同 K
  • 与 KafkaStreams 的窗口结束外连接

    我有一个 Kafka 主题 我希望消息具有两种不同的密钥类型 旧的和新的 IE 1 new 1 old 2 new 2 old 密钥是唯一的 但有些可能会丢失 现在 使用 Kotlin 和 KafkaStreams API 我可以记录具有相
  • Kafka Stream groupBy 行为:聚合的许多中间输出/更新

    我正在尝试使用 Kafka Stream 来聚合人们的某些属性 我有一个像这样的卡夫卡流测试 new ConsumerRecordFactory Array Byte Character input new ByteArraySeriali
  • 新建的 KTable 不返回任何内容

    我正在尝试使用 KTable 来消费来自 Kafka 主题的事件 但是 它什么也没返回 当我使用 KStream 时 它返回并打印对象 这实在是太奇怪了 生产者和消费者可以在这里找到 https github com pavankjadda
  • 了解 kafka 流分区分配器

    我有两个主题 一个有 3 个分区 一个有 48 个分区 最初我使用默认分配器 但是当消费者 kubernetes 中的 pod 崩溃时我遇到了一些问题 发生的情况是 当 Pod 再次启动时 它从具有 3 个分区的主题重新分配分区 并从具有
  • 卡夫卡消费者重新平衡时间太长

    我有一个 Kafka Streams 应用程序 它从几个主题获取数据并连接数据并将其放入另一个主题中 卡夫卡配置 5 kafka brokers Kafka Topics 15 partitions and 3 replication fa
  • Kafka Streams.allMetadata() 方法返回空列表

    所以我正在尝试使用 Kafka 流进行交互式查询 我有 Zookeeper 和 Kafka 在本地运行 在 Windows 上 我使用 C temp 作为 Zookeeper 和 Kafka 的存储文件夹 我已经设置了这样的主题 kafka
  • Kafka - 流与主题

    Kafka主题和流有什么区别 我以为两者是一样的 这位医生说create stream from a topic这引起了混乱 https docs ksqldb io en latest developer guide create a s
  • 如何以自定义方式从主题恢复全局存储?

    假设我在从主题获取数据后将数据存储在 Globalstore 中时正在进行一些自定义处理 即我正在根据 message 的值创建自定义键 在本地删除状态后 它会以相同的方式再次恢复 Globalstore 吗 override def pr
  • RocksDb sst 文件的 GUI 查看器

    我正在与 Kafka 合作 将数据保存到rocksdb 中 现在我想看看 Kafka 创建的数据库键和值 我下载了 FastNoSQL 并尝试但失败了 该文件夹包含 sst 文件 日志文件 当前文件 身份文件 锁定文件 日志文件 清单文件
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • TopologyTestDriver 在 KTable 聚合上发送错误消息

    我有一个聚合在 KTable 上的拓扑 这是我创建的通用方法 用于根据我拥有的不同主题构建此拓扑 public static
  • 将数据从 Kafka 存储传输到 Kafka 主题

    我想在卡夫卡做这样的事情 继续将数据存储在 KStream Ktable Kafka store 中 当我的应用程序收到特定事件 数据时 仅将上述存储中的特定数据集发送到主题 我们可以在卡夫卡中做到这一点吗 我认为单独使用 Kafka 消费
  • Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

    我有多个冗余应用程序实例 它们想要使用主题的所有事件并独立存储它们以进行磁盘查找 通过rocksdb 为了便于论证 我们假设这些冗余消费者正在服务无状态 http 请求 因此 负载不是使用 kafka 共享的 而是使用 kafka 将数据从
  • 如何检测 KTable 连接的哪一侧触发了更新?

    当您在 Kafka 中连接两个表时 每次更新两个 KTable 之一时 您的输出 Ktable 也会更新 想象一下你正在加入Customers与一个列表Orders你已经适当减少了 再次想象一下 您使用此连接的结果来为最终客户提供特别优惠和
  • Kafka Streams 反序列化处理程序

    我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 当发生错误时 通过成功记录错误并继续 它可以正常工作 但是 假设我的传入消息有连续的错误流 我停止并重新启动 kafka 流应用程序 然后我看到失败并
  • 为什么我的 Kafka Streams 拓扑无法正确重放/重新处理?

    我有一个如下所示的拓扑 KTable

随机推荐

  • 当属性名称包含空格和保留字时,将 JSON 映射到 C# 类

    我正在使用 REST API 服务 在服务响应 JSON 中 存在包含空格和保留字的属性名称 我试图将其映射到 C 类 但无法分配与字段名称相同的属性名称 目前 只有当字段的 JSON 属性和 C 类名完全匹配时 映射才会成功 否则该值将如
  • 如何仅使用 math.h 将双精度数转换为字符串?

    我正在尝试将双精度数转换为本机 NT 应用程序中的字符串 即仅依赖于ntdll dll 不幸的是 ntdll 的版本vsnprintf不支持 f等 迫使我自己实现转换 之前所提ntdll dll只出口其中的一小部分math h功能 floo
  • 如何在单个浏览器页面上向 Dash 应用程序添加多个图表?

    如何在同一页面上添加多个图中显示的图表 我试图将 html Div 组件添加到以下代码中以更新页面布局 以在单页上添加更多图形 但这些新添加的图形不会显示在页面上 只有旧图形显示在图片中可见 我应该修改什么元素 比如说在浏览器上的破折号应用
  • maven-shade-plugin 报告:创建着色 jar 时出错:...target/classes(是一个目录)

    当使用 m2eclipse 工具在 eclipse 中为配置为以下项目的项目运行 Maven 构建时Maven 阴影插件 https maven apache org plugins maven shade plugin 构建失败并显示以下
  • ResponseEntity 不接受文本/csv:Spring Boot

    我正在尝试创建一个接受 CSV 和 json 正文请求等文件的 API 我尝试使用ResponseEntitySpring Boot 中的对象 端点如下所示 PostMapping value csv consumes MediaType
  • 左侧 div 在 bootstrap 中未正确对齐[重复]

    这个问题在这里已经有答案了 我的代码中已经有粘性标头 我正在尝试在左侧添加一个粘性 div 最初看起来不错 当我尝试滚动内容时 布局正在改变 此外 在最小尺寸 移动尺寸 时 左侧 div 未正确对齐 请建议 var onResize fun
  • 如何通过 RxJs 合并或 groupBy toPromise?

    我有以下方法返回结果 如下所示 result status 200 status 200 status 400 我需要使用状态值对结果进行分组 并且对于上面的示例结果仅返回 2 个结果 而不是 3 个 update this demoSer
  • 这个Makefile 有什么问题吗?

    当我在以下 Makefile 上运行 make all 时 出现此错误 Makefile 5 缺少分隔符 停止 这是什么问题以及如何修复它 LEX lex YACC yacc CC gcc calcu y tab o lex yy o CC
  • GCC 左移溢出

    下面的小程序在 Mac 上使用 GCC 版本 4 2 1 Apple Inc build 5664 非常尴尬 include
  • Node+Express+MongoDB Native Client 性能问题

    我正在使用 MongoDB 测试 Node js ExpressJS Fastify Python Flask 和 Java 带有 webflux 的 Spring Boot 的性能 我将所有这些示例应用程序相继托管在同一台服务器上 因此所
  • 如何调试 GLSL 着色器?

    我需要调试 GLSL 程序 但我不知道如何输出中间结果 是否可以使用 GLSL 进行一些调试跟踪 例如使用 printf 而不使用像 glslDevil 这样的外部软件 您无法轻松地从 GLSL 内部与 CPU 进行通信 使用 glslDe
  • 如何仅显示一次网站预加载器

    我向我的网站添加了预加载器 每次访问该网站时都会播放预加载器动画 我希望它在每次访问域名时只播放一次 任何点击网站上的主页按钮或浏览器中的后退按钮我希望跳过预加载器 我希望它在任何时候在新选项卡或新浏览器窗口中打开时都显示出来 我尝试添加c
  • Python 日志记录:覆盖日志时间

    下列的Python 的文档 http docs python org library logging html logging Formatter formatTime 我正在尝试覆盖logging Formatter converter以
  • 新类型比枚举更快吗?

    根据本文 http www haskell org haskellwiki Performance Data types 就 GHC 而言 枚举不算作单构造函数类型 因此当用作严格构造函数字段或严格函数参数时 它们不会从解包中受益 这是 G
  • 如何处理 SVG 像素捕捉

    我正在尝试使用路径元素渲染两条 svg 线 第一行宽度为 1px 并且很锐利 第二条线宽度为 2px 并且很模糊两者的笔划宽度相同 如何解决这个问题
  • 防止在 Ruby 中将字符串转换为八进制数

    假设我们有以下 ruby 代码 require yaml h key gt step1 gt 0910 1223 puts h to yaml 0910 是一个字符串 但是之后to yaml转换 字符串变成八进制数 key step1 09
  • 哪一个 NoSQL 数据库(如果有)可以为查询结果集提供*更改*流?

    哪个 NoSQL 数据库 如果有的话 可以提供changes到查询结果集 有人能给我举一些例子吗 首先 我相信没有一个 SQL 数据库提供此功能 我是对的吗 我需要能够指定任意 简单的查询 其在 SQL 中的等价形式可以写成 SELECT
  • 控制 zenity 窗口中内容的大小?

    我可以控制 a 的大小zenity http en wikipedia org wiki Zenity窗口与 width和 height论点 zenity info text This is an information box width
  • Angular ui-select:如何仅将选定的值绑定到 ng-model

    scope property new Property scope property propertyType scope propertyTypes value ResidentialPlot name Residential Plot
  • 现有内部主题具有无效分区

    当在只有一个 Kafka 代理的测试设置中启动我们的 Kafka Streams 应用程序时 我们大约在 15 次运行中看到以下错误 org apache kafka streams errors StreamsException Exis