Kafka 主题分区

2024-03-04

关于 Kafka 主题和分区的一个简单问题。假设以下场景:

  • Producer1将数据写入Topic1。

  • Producer2向Topic2写入数据

  • Consumer读取Topic 1和Topic 2的数据。

  • Consumer2仅从Topic2读取数据。

问题是:每个Topic内部有多少个分区?难道真的要靠消费者数量来推动并行吗?或者它只是设置到文件 server.config 中的参数?在后一种情况下,有没有办法让不同的主题具有不同数量的分区?


首先要了解的是,主题分区是 Kafka 中的并行单位。在生产者和代理端,对不同分区的写入可以完全并行完成。在消费者方面,Kafka总是将单个分区的数据提供给一个消费者线程。因此,消费者(在消费者组内)的并行度受到正在消费的分区数量的限制。因此,一般来说,Kafka集群中的分区越多,能够实现的吞吐量就越高。

每个Topic内部有多少个分区?这是可配置的。您可以增加分区,但一旦增加,就无法减少它。 Apache Kafka 为我们提供了 alter 命令来更改 Topic 行为和添加/修改配置。我们将使用 alter 命令向现有主题添加更多分区。

以下是将主题“my-topic”的分区数增加到 20 的命令 -

./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic my-topic --partitions 20

您可以使用describe命令验证分区是否已增加,如下所示 -

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-topic

一个主题需要设置多少个分区?请在这里阅读这份写得很好的文档:https://www.confluence.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/ https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 主题分区 的相关文章

  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • Spark shell (spark 3.0.0) 添加包 confluence kafka 5.5.1 javax.ws.rs-api 问题

    我本地的win10 WSL回到ubuntu 在ubuntu上 我安装了spark3 0 0 confluence平台5 5 1 手动下载 当我尝试运行spark shell或spark submit时 下面是shell示例 spark sh
  • Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

    我有多个冗余应用程序实例 它们想要使用主题的所有事件并独立存储它们以进行磁盘查找 通过rocksdb 为了便于论证 我们假设这些冗余消费者正在服务无状态 http 请求 因此 负载不是使用 kafka 共享的 而是使用 kafka 将数据从
  • Kafka不启动空白输出

    我正在努力安装 Kafka 和 Zookeeper 我已经运行了 Zookeeper 并且它当前正在运行 我将所有内容设置为 https dzone com articles running apache kafka on windows
  • Confluence 平台与 apache kafka [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我是 kafka 的新手 对 Confluence 平台很好奇 看来Confluence平台上的用户故事并不多 Confluence平台和Apa
  • 未能在kafka-storm中将偏移量数据写入zookeeper

    我正在设置一个风暴集群来计算实时趋势和其他统计数据 但是我在将 恢复 功能引入到这个项目中时遇到了一些问题 方法是允许上次读取的偏移量kafka spout 源代码为kafka spout来自https github com apache
  • 从kafka获取特定时间段的结果

    这是我的代码 它使用kafka python now datetime now month ago now relativedelta month 1 topic some topic name consumer KafkaConsumer
  • Mesos DCOS 未安装 Kafka

    我正在尝试在 Mesos 上安装 Kafka 看来安装已经成功了 vagrant DevNode dcos dcos package install kafka This will install Apache Kafka DCOS Ser
  • Kafka JDBC Sink Connector,批量插入值

    我每秒收到很多消息 通过 http 协议 50000 100000 并希望将它们保存到 PostgreSql 我决定使用 Kafka JDBC Sink 来实现此目的 消息以一条记录保存到数据库 而不是批量保存 我想在 PostgreSQL
  • 为什么我的 Kafka Streams 拓扑无法正确重放/重新处理?

    我有一个如下所示的拓扑 KTable
  • JDBC Kafka Connector 可以从多个数据库中提取数据吗?

    我想设置一个 JDBC Kafka 连接器集群 并将它们配置为从同一主机上运行的多个数据库中提取数据 我一直在查看 Kafka Connect 文档 似乎在配置 JDBC 连接器后 它只能从单个数据库中提取数据 谁能证实这一点吗 根据您启动
  • 即使在 Kafka 中进行轮询后,当前也不会发生分区分配

    我有 Java 8 应用程序与 Apache Kafka 2 11 0 10 1 0 一起使用 我需要使用seek特征为poll来自分区的旧消息 然而我遇到了一个例外No current assignment for partition每次
  • Kafka Consumer 如何(应该)应对有毒消息

    当 Kafka Consumer 无法反序列化消息时 客户端应用程序是否有责任处理有毒消息 Or Kafka是否会 增加 消息偏移并继续消费有效消息 是否有处理 Kafka 主题上的有毒消息的 最佳实践 当 Kafka 无法反序列化记录时

随机推荐

  • 将几个参数传递给 lapply 的 FUN(以及其他 *apply)

    我有一个关于在使用时将多个参数传递给函数的问题lapply in R 当我使用 lapply 的语法时lapply input myfun 这很容易理解 我可以这样定义 myfun myfun lt function x doing som
  • C# 中的异步/等待和并行 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 列数将图像减半

    我在页面上有很大一部分文本 当屏幕更大时 我可以使用媒体查询来使文本形成列 然而 当我这样做时 有些照片没有完全进入一栏 无论出于何种原因 它都会分割照片并将其一部分显示在一列的底部 并将其余部分显示在下一列中 div 也会发生这种情况 如
  • 如何更新 Windows 最新托管运行程序的 github 操作工作流程文件中的 PATH

    我目前正在尝试将 GitHub 操作工作流程添加到存储库中 要进行 C CMake swig python 开发 即本机 python 库开发 我需要下载并安装 swigwin 并将其提供在PATH 不幸的是 似乎 env Path 在下一
  • 在不影响CloudKit正确性的情况下执行持久历史记录清除的正确方法是什么?

    目前 我们正在使用本地CoreData with CloudKit特征 通过使用NSPersistentCloudKitContainer 为什么我们启用持久历史跟踪功能 由于问题描述于https stackoverflow com a 7
  • 如何在不使用 nginx 的情况下通过 ingress 启用 CORS?

    我正在尝试使用 Kubernetes 设置 RESTful API 应用程序 我有一个准系统设置 其中包含集群 静态 IP 地址 使用 NodePort 类型的公开服务部署的应用程序以及配置了 SSL 托管证书的入口 我需要启用 CORS
  • 如何正确地将sqlite框架添加到Xcode项目中?

    我正在尝试将 SQLite 添加到我的项目中 我检查了构建阶段选项卡下的目标依赖项 它是空的 这是真的 我收到以下错误 无法运行命令 Ld SQLite 该目标可能包括其自己的产品 我正在使用 swift 3 你能帮我么 提前致谢 我目前不
  • 在 HSQLDB 2.0.0-rc8 中选择下一个序列值的“正确”方法

    假设我有一个序列 称为 TEST SEQ 选择下一个值的正确方法是什么 这不起作用 select next value for TEST SEQ 可能是因为它需要一个 FROM 子句 在休眠中查看 HSQLDialect getSequen
  • 帕拉米科。按修改时间获取文件

    localpath U utime sftp stat TestBTEC st mtime last modified datetime fromtimestamp utime if datetime now last modified l
  • 使用类为第三方库创建类型

    我有一个第三方库 它具有以下 ES6 类签名 class Machine constructor options static list callback create options callback 我尝试为此类创建类型声明 但出现一些
  • 在 Vim 中打开 NERDTree 和 Tlist 并排放置

    我正在寻找一种方法来 自动 打开左侧正上方的 NERDTree 和 Tlist 以便每个插件占据屏幕高度的一半 我已经找到了这个问题 https stackoverflow com questions 6005874 opening a w
  • Servlet 中的 JSF 托管 Bean

    有没有办法从 servlet 访问 JSF 托管 bean 在 Servlet 中 您可以通过以下方式获取请求范围的 beans Bean bean Bean request getAttribute beanName 和会话作用域的 be
  • Java 数组效率

    我不能 100 确定该机制正在发挥作用 因此我决定在此发帖以进一步澄清 我正在做一个项目 应该用Java处理大量数据 它必须是Java 我希望它尽可能高效 我所说的高效是指内存和计算速度应该放在第一位 可读性应该放在第二位 现在我有两种方法
  • 使用图像(宽高比填充)和视频制作 AVMutableComposition 以适合宽高比

    我正在尝试使用尺寸始终为 CGSize 375 667 的图像制作新视频 但视频尺寸不同 且 contentMode 为 aspectFit 问题是我无法弄清楚如何使整个视频组合具有正确的尺寸 即图像尺寸 而是视频的自然尺寸和一堆奇怪的结果
  • 批量使用 PowerShell 命令的问题

    我使用 PowerShell 命令从云下载 zip 文件 该命令在 PowerShell 和命令行中都能正常工作 但是 如果我将命令行中的命令插入批处理脚本中 则只会下载 html 为什么该命令在命令行中可以正常工作 但在批处理文件中却不能
  • GET 文件上传如何工作?

    有谁知道怎么办GWT文件上传有效吗 我知道关于FileUpload小部件以及如何使用它 我想知道它的内在机制是什么 我们无法从中获取文件内容FileUpload客户端中的小部件以及它如何发送到服务器 我用谷歌搜索但没有得到解决方案 提前致谢
  • 仅当活动未显示时才显示通知

    我有一个想要处理的后台任务 问题是 当任务完成时 我想调用一个新的 Activity 来向用户显示结果 前提是我的主 Activity 正在显示 否则我只想发送一个通知 以便用户可以看到该操作已完成 并且可以随时打开它 我正在考虑使用一个服
  • 强制从 s3 亚马逊服务器下载

    我一直在开发一个新的网络应用程序 它依赖于亚马逊S3服务器作为存储系统 以及代码点火器作为 PHP 框架 我需要在单击链接时强制下载文件 原始网址如下所示 http www our web com download do 1 jpg 它会生
  • 主构造函数内的 Scala 局部变量

    在 Scala 中如何在主构造函数中定义局部变量 我需要解决这个练习Scala for the impatient book 编写一个具有接受字符串的主构造函数的 Person 类 包含名字 空格和姓氏 例如 new 人 弗雷德 史密斯 提
  • Kafka 主题分区

    关于 Kafka 主题和分区的一个简单问题 假设以下场景 Producer1将数据写入Topic1 Producer2向Topic2写入数据 Consumer读取Topic 1和Topic 2的数据 Consumer2仅从Topic2读取数