卡夫卡模式订阅。新主题未触发重新平衡

2023-11-22

根据有关的文档卡夫卡java文档 if I:

  • 订阅模式
  • 创建与模式匹配的主题

应该进行重新平衡,这使得消费者可以阅读该新主题。但这并没有发生。

如果我停止并启动消费者,它确实会获取新主题。所以我知道新主题符合模式。这个问题可能有重复https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics但这个问题没有任何结果。

我看到 kafka 日志,没有错误,只是没有触发重新平衡。当消费者加入或死亡时会触发重新平衡,但在创建新主题时不会触发(即使将分区添加到现有主题时也不会触发,但这是另一个主题)。

我正在使用 kafka 0.10.0.0 和“New Consumer API”的官方 Java 客户端,这意味着代理 GroupCoordinator 而不是胖客户端 + Zookeeper。

这是示例消费者的代码:

public class SampleConsumer {
public static void main(String[] args) throws IOException {
    KafkaConsumer<String, String> consumer;
    try (InputStream props = Resources.getResource("consumer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        properties.setProperty("group.id", "my-group");

        System.out.println(properties.get("group.id"));
        consumer = new KafkaConsumer<>(properties);
    }
    Pattern pattern = Pattern.compile("mytopic.+");
    consumer.subscribe(pattern, new SampleRebalanceListener());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("%s %s\n", record.topic(), record.value());
        }
    }
}

}

在生产者中,我将消息发送到名为 mytopic1、mytopic2 等的主题。

如果不触发重新平衡,模式几乎毫无用处。

您知道为什么重新平衡没有发生吗?


该文档提到“模式匹配将定期针对检查时存在的主题进行。”。事实证明,“定期”对应于metadata.max.age.ms 属性。通过将该属性(在我的代码示例中的“consumer.props”内)设置为 5000,我可以看到它每 5 秒检测一次新主题和分区。

根据这个 jira 票证,这是设计的https://issues.apache.org/jira/browse/KAFKA-3854:

JIRA 上的最后一条注释指出,后来创建的与消费者订阅模式相匹配的主题在创建时不会分配给消费者,这似乎是按照设计的。需要对相同模式重复 subscribe() 来处理这种情况。

刷新元数据轮询会执行票证中提到的“重复 subscribe()”。

这是来自 Kafka 0.8 的令人困惑的事情,其中​​真正的触发是基于 Zookeper 手表,而不是轮询。 IMO 0.9 更多的是针对这种情况的降级,而不是“及时”重新平衡,这变成了要么有开销的高频轮询,要么在对新主题/分区做出反应之前需要很长时间的低频轮询。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

卡夫卡模式订阅。新主题未触发重新平衡 的相关文章

随机推荐

  • C# 如何通过给定进程 ID 最小化另一个应用程序?

    我想通过进程 ID 最小化应用程序 我搜索了SO并找到了以下代码 private const int SW MAXIMIZE 3 private const int SW MINIMIZE 6 DllImport user32 dll En
  • brew 安装 php56 后出现库未加载错误

    我正在使用自制软件在 OSX Mavericks 上从 php5 5 升级到 php5 6 我跑 brew install php56 一切都运行并完成得很好 但是当我运行时php v出现以下错误 dyld Library not load
  • Firebase:隐形 reCaptcha 在 React JS 中不起作用

    Overview 你好 我正在使用Firebase 的隐形 reCaptcha用于我的电话号码验证React JS应用 根据文档您需要提供的 Firebaseid e g sign in button 处理登录表单提交的按钮 预期行为 一旦
  • 在 CloudFormation 文件中使用 apt-get 安装软件包

    我有以下 CloudFormation 脚本 正在创建堆栈并启动 Ec2 实例 我可以通过 SSH 连接 但它没有安装软件包 我不确定它失败在哪里 我正在使用Ubuntu 我找不到我的实例上是否安装了 cfn init 还是仅为 Amazo
  • 如何在Java中循环遍历类属性?

    如何动态循环java中的类属性 For eg public class MyClass private type1 att1 private type2 att2 public void function for var in MyClas
  • 对于特定提交,Git-merge 和 Git-cherry-pick 有什么区别?

    a 之间有区别吗 git merge
  • 在 ZipArchive C# .Net 4.5 中创建目录

    ZipArchive 是 ZipArchiveEntries 的集合 添加 删除 Entries 效果很好 但似乎没有目录 嵌套 档案 的概念 理论上 该类与文件系统解耦 因为您可以完全在内存流中创建存档 但是 如果您希望在存档中添加目录结
  • 如何读取“adb shell dumpsys Alarm”输出

    我正在努力正确设置闹钟 并理解取消和重新安排闹钟的机制 我发现有一个 adb 命令可以检索设备上安排的所有警报 但我没有找到解释输出格式的文档 我确实理解 我在这里要求很多解释 所以如果有人会抛出一个包含有关 adb shell dumps
  • Facebook 官方图标 [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 目前不接受答案 我将为 iOS Android 和 Windows Phone 开发与 Facebook 连接的应用程序 我正在寻找http developer f
  • IDataErrorInfo 与 IValidatableObject?

    目前我的业务对象实现IDataErrorInfo 由于我打算在 ASP NET MVC 3 中使用这些库 我认为我应该实现IValidatableObject以及或者也许代替 WPF 可以与IValidatableObject 数据注释如何
  • 等待 selenium 和 c# 中完成下载文件

    我有一些问题 我在单击 Web 应用程序上的图标后下载文件 我的下一步是在下载记录文件之前执行的 我想等到文件下载完毕 有谁知道如何等待吗 我使用以下脚本 filename应该传入 第一部分等待文件出现在磁盘上 适用于 chrome 第二部
  • 这个 bash 叉子炸弹是如何工作的? [复制]

    这个问题在这里已经有答案了 根据维基百科 以下是一个非常优雅的 bash fork 炸弹 它是如何工作的 拆开来看 主要分为三大块 Defines a function It takes no arguments The body of t
  • ADODB.Stream 错误“800a0bbc”写入文件失败

    当我更改托管提供商时 我显示了奇怪的错误 ADODB Stream 错误 800a0bbc 写入文件失败 cp portal upload asp 第 63 行 我提供了所需的权限 它解决了更新 Access DB 问题 但文件上传仍然没有
  • XMLHttpRequest 分块响应,仅读取正在进行的最后一个响应

    我正在将分块数据从 NodeJS 应用程序发送回浏览器 这些块实际上是 json 字符串 我遇到的问题是每次onprogress调用函数时 它会添加完整数据的字符串 这意味着第二个响应块将附加到第一个响应块 依此类推 我只想获取 刚刚 收到
  • 在 R 中使用 grep 对多个加载的包进行搜索函数

    假设我有包裹base dplyr data table tidyr等加载使用sapply sapply c dplyr data table tidyr library character only TRUE 因此 要检查特定包中的函数列表
  • 如何从 MySQL 数据库检索图像并在 html 标签中显示

    我使用 phpmyadmin 创建了一个带有表的 MySQL 数据库 我使用 BLOB 列创建了此表来保存 jpeg 文件 我对 php 变量有疑问 result here 到目前为止我的代码 catalog php img src wid
  • 使用可变参数重载函数

    这不会编译 public class Methods public static void method Integer i System out print A public static void method int i System
  • SQLite 表约束 - 多列上唯一

    我可以在 SQLite 网站上找到这方面的语法 图表 但没有示例 而且我的代码崩溃了 我有其他表在单个列上具有唯一约束 但我想在两列上向表添加约束 这就是我所遇到的导致 SQLiteException 并显示消息 语法错误 的原因 CREA
  • PrimeFaces 禁用按 Enter 键提交

    PrimeFaces 禁用按 Enter 键提交 我正在运行在 WildFly 8 2 Final 上运行的 PrimeFaces 5 1 我有一个对话框 有两个输入数字和两个按钮 第一个 inputNumber 对 ajax 模糊事件进行
  • 卡夫卡模式订阅。新主题未触发重新平衡

    根据有关的文档卡夫卡java文档 if I 订阅模式 创建与模式匹配的主题 应该进行重新平衡 这使得消费者可以阅读该新主题 但这并没有发生 如果我停止并启动消费者 它确实会获取新主题 所以我知道新主题符合模式 这个问题可能有重复https