具有动态数量的并行消费者的 Kafka 工作队列

2024-03-12

我想用Kafka来“分工”。我想将工作实例发布到某个主题,并运行由相同使用者组成的云来处理它们。当每个消费者完成其工作时,它将从该主题中提取下一个工作。每项工作只能由一个消费者处理一次。处理工作非常昂贵,因此我需要在许多机器上运行许多消费者才能跟上。我希望消费者的数量根据需要增加和减少(我计划为此使用 Kubernetes)。

我发现了一种为每个消费者创建唯一分区的模式。这就“分工了”,但是分区的数量是在创建主题时设置的。此外,必须在命令行上创建主题,例如

bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 3 --topic divide-topic --create --replication-factor 1

...

for n in range(0,3):
    consumer = KafkaConsumer(
                     bootstrap_servers=['localhost:9092'])
    partition = TopicPartition('divide-topic',n)
    consumer.assign([partition])
    ...

我可以为每个消费者创建一个独特的主题,并编写自己的代码来将工作分配给这些主题。这看起来很恶心,而且我仍然必须通过命令行创建主题。

具有动态数量的并行消费者的工作队列是一种常见的体系结构。我不可能是第一个需要这个的人。使用 Kafka 的正确方法是什么?


您发现的模式是准确的。请注意,也可以使用以下命令创建主题卡夫卡管理 API http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/admin/AdminClient.html#createTopics-java.util.Collection- and 还可以添加分区 http://kafka.apache.org/documentation/#basic_ops_modify_topic创建主题后(有一些陷阱)。

在 Kafka 中,划分工作和允许扩展的方法是使用分区 http://kafka.apache.org/documentation/#kafka_mq。这是因为在消费者组中,每个分区在任何时候都被单个消费者消费。

例如,您可以有一个具有 50 个分区的主题和一个订阅该主题的消费者组:

  • 当吞吐量较低时,组中只能有少数消费者,他们应该能够处理流量。

  • 当吞吐量增加时,您可以添加使用者(最多可达分区数量(本例中为 50))来承担部分工作。

在这种情况下,50 个消费者是扩展的极限。消费者公开了许多指标(例如延迟),让您可以随时决定是否有足够的指标

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有动态数量的并行消费者的 Kafka 工作队列 的相关文章

随机推荐

  • 如何设置jquery按钮的活动状态

    我有一个使用 jquery 按钮的正确导航 如果用户点击它 页面就会加载 我需要的是 一旦页面重新加载 按钮应该显示它已被选中 如何使用 jquery 按钮执行此操作 为什么不使用 jQuery UI Tab 小部件 您也可以尝试使用按钮集
  • DataGridView:仅当滚动到底部时自动向下滚动

    我有一个程序 它使用 dataGridView 来显示通过向 dataGridView 添加行来每秒自动更新的数据 当我想在开头阅读一些内容时 我会向上滚动 即使数据更新 滚动条也不会向下滚动 这很好 但我希望滚动条仅在位于 dataGri
  • 如何在 LibreOffice 中运行 python 宏?

    当我去工具 gt 宏 gt 组织宏 gt Python我得到这个对话框 It is 不可能创建新的 Python 宏 显然 LibreOffice 有没有Python编辑器所以我必须在其他地方编写宏 然后执行它们 但是我不知道where放置
  • 我不断收到此错误:“阅读器关闭时调用 Read 的尝试无效”

    这是我的代码 我关闭并打开阅读器 但它仍然无法工作 几个线程可以同时访问这个函数 但是有一个锁 它一开始会工作几次 但迟早我会收到异常 阅读器关闭时调用 Read 的尝试无效 private IList
  • 在 El Capitan 10.11.6 上安装 Tensorflow 1.10

    我试图在我的旧Mac上安装tensorflow 1 10 但每次都会遇到同样的问题 一旦我启动 python shell 我就会收到以下错误 我确实尝试先将其安装在 virtualenv 中 之后 我尝试仅使用 pip 安装它并得到相同的错
  • Tempus Dominus Bootstrap4 需要 moment.js。 (日期时间选择器)

    我正在尝试使用 Tempus Dominus Bootstrap4 添加 DateTimePicker 但我收到以下错误 I use Laravel as a front end So I use laravel mix 我的刀片文件在下面
  • .htaccess 重写规则中的模式干扰

    在 htaccess 中定义重写规则时 我遇到了模式干扰问题 我试图重写的网站链接是 example com item work gt example com work example com item work tile x gt exa
  • 使用 ActiveAdmin 进行多步骤表单?

    是否可以使用 ActiveAdmin 创建多步骤表单 如果没有 是否可以在提交表单后添加另一个重定向到的页面 不是默认索引 显示或表单页面 我自己也一直在苦恼这个问题 我发现您可以使用 ActiveAdmin 文件中的集合操作添加自己的页面
  • Gwt 2.8-rc1 超级开发模式不适用于 IntelliJ IDEA 2016.2.5

    我的项目在超级开发模式下使用 GWT 2 7 和 2 8 beta1 正确启动 不幸的是 自 2 8 rc1 发布以来 它没有启动 看起来 GWT 项目依赖项配置不正确 IDE IntelliJ IDEA 2016 2 5 C Progra
  • 如何在 karma angularjs 中对 setInterval 进行单元测试

    app directive shuffleBlocks function timeout return link function sco ele att if itemCnt lt 1 return Trigger function fu
  • UITableViewCell 布局在重复使用单元格之前不会更新

    我有一个 UITableView 其中填充了自动调整大小的单元格 UITableView设置相当简单 tableView estimatedRowHeight 70 tableView rowHeight UITableViewAutoma
  • 将 angularjs 值传递给 PHP 变量

    我从 AngularJS 和 ngStorage 开始 我可以成功保存并显示数据 我像以前一样显示值 myobj session 我想将任何存储的值传递到 php 变量中 下面显示的是我的想象逻辑 我知道那是行不通的 我的问题是如何以正确的
  • 如何让 jest 使用 ES6 依赖项

    所以我有一个依赖包 我将其拉入我的 node modules 文件夹中 这个包中有一个像这样的导出 Object
  • PHP sleep() 导致 CPU 使用率高

    我正在运行一个大部分时间处于睡眠状态的 CLI 脚本 每隔 10 秒左右 脚本就会执行一些操作 问题是 脚本在睡眠时 CPU 使用率为 94 我设置的方法是 while 1 sleep 10 doStuff 虽然这按预期工作 但存在一个明显
  • 字符+字符=整数?为什么?

    为什么要加两个char在 C 中结果为int type 例如 当我这样做时 var pr R G B Y P the pr变量变成int类型 我希望它是一个string类型值为 RGBYP 为什么C 要这样设计呢 默认实现不是添加两个cha
  • 从Excel VBA的下拉列表中选择特定项目

    我正在为我的办公室设计一个仪表板 这一切都有效 但我想添加一个选项 而不是在下拉列表中搜索 250 多个项目 您还可以单击一个单元格 下拉列表将更改为该值 并且分配的宏将为该下拉列表运行 到目前为止 我不知道如何让 vba 从下拉列表中选择
  • 具有列表视图的 Android 小部件正在刷新具有丑陋的短“闪烁”效果的项目

    我想制作一个带有 ListView 的小部件 您可以在其中添加 listItems 来显示计数器计时器 以查看您有多少时间来处理某个事件 这是我第一次使用小部件 我不知道我的方法是好还是坏 到目前为止我得到了这个 AppWidgetProv
  • Javascript - 所有嵌套的 forEach 循环完成后的回调

    我确信这是一个相当简单的任务 但我现在无法全神贯注 我有一组嵌套的 forEach 循环 当所有循环运行完毕时 我需要一个回调 我愿意使用 async js 这就是我正在处理的 const scanFiles function accoun
  • 如何使用 BiWeekly 库和 Java Mail API 创建现有事件并发送更新?

    我在用着BiWeekly http sourceforge net projects biweekly 库来创建 VEVENT 然后使用以下命令发送它Java 邮件 API https java net projects javamail
  • 具有动态数量的并行消费者的 Kafka 工作队列

    我想用Kafka来 分工 我想将工作实例发布到某个主题 并运行由相同使用者组成的云来处理它们 当每个消费者完成其工作时 它将从该主题中提取下一个工作 每项工作只能由一个消费者处理一次 处理工作非常昂贵 因此我需要在许多机器上运行许多消费者才