将消费者偏移量重置为 Kafka Streams 的开头

2023-11-30

我正在使用 Kafka 流,并且想要将一些消费者偏移量从 Java 重置到开头。KafkaConsumer.seekToBeginning(...)听起来是正确的做法,但我使用 Kafka Streams:

KafkaStreams streams = new KafkaStreams(builder, props);
...
streams.start();

我想根据我定义的具体流管道,这会在后台创建多个消费者。我可以访问这些吗?或者还有其他方法可以以编程方式重置偏移量吗?


基于 Hans Jespersens 的回答,我成功地使用此代码完成了脚本在 Java 代码中执行的操作:

import kafka.tools.StreamsResetter;

StreamsResetter resetter = new StreamsResetter();
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER};
resetter.run(args);

该类是我使用以下命令导入到 Maven 中的 kafka 核心库的一部分:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>${kafka.version}</version>
    </dependency>
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将消费者偏移量重置为 Kafka Streams 的开头 的相关文章

  • UnknownHostException:名称或服务未知

    我正在尝试使用 com squareup okhttp 中的 OkHttpClient 从 API 返回一些数据 我遇到了一些错误 我最终能够克服这些错误 但我无法克服这个主机异常错误 并且这里似乎没有任何内容足够具体到我的情况能够解决 下
  • 行类型 Spark 数据集的编码器

    我想写一个编码器Row https spark apache org docs 2 0 0 api java index html org apache spark sql Row html输入 DataSet 用于我正在执行的地图操作 本
  • 从通用对象访问字段变量

    我有两节课ClassOne and ClassTwo 更新公共字段data i e public class ClassOne public byte data new byte 10 Thread that updates data an
  • Chrome 崩溃:尝试在空对象引用上调用虚拟方法“long android.view.accessibility.AccessibilityNodeInfo.getSourceNodeId()”

    在处理网页的搜索表单 JavaScript CSS HTML 时 每次单击网络搜索图标并且输入字段获得焦点时 Chrome 浏览器 Android 10 都会崩溃 崩溃报告中的调试堆栈跟踪显示 Attempt to invoke virtu
  • Spring Security 中 Web 忽略和 Http 允许之间的区别?

    这两种方法有什么区别 Override protected void configure HttpSecurity http throws Exception http authorizeRequests antMatchers api p
  • GAE 上奇怪的 500 错误

    我今天开始在我的应用程序上收到此错误 根本不记得更改任何内容 每当我在本地尝试时它都工作正常 但部署后我会收到此错误 EXCEPTION java lang ClassNotFoundException se myApp server My
  • Java - 直观地拖动摆动元素

    有没有类似的解决方案http allen sauer com com allen sauer gwt dnd demo DragDropDemo DragDropDemo html PaletteExample http allen sau
  • 搜索 JTable 时 - 未获得正确的 ID

    所以我尝试在搜索名称后单击表 然后在其他表中编辑它 问题是我没有获得正确的 ID 而只获得第一个 ID JTable https i stack imgur com TnNIq png 搜索行动 https i stack imgur co
  • 在 Java 中对多语言环境字符串进行排序

    我正在尝试按字符串字段 国家 地区 对对象列表进行排序 每个国家 地区都使用其母语 阿根廷 澳大利亚 奥地利 例如 我想要做的是让 出现在 A 国家之后 因为字母 对应于拉丁语 B 我正在尝试使用默认的 Collat er 但非拉丁名称仍然
  • 使用 iText 在内存上生成在磁盘上生成的 PDF

    我正在从 Java 应用程序生成 PDF 并且效果很好 问题是 PDF 在磁盘上生成为 Document documento new Document PageSize A4 25 25 25 25 PdfWriter writer Pdf
  • 如何仅使用命令行运行 Maven 创建的 jar 文件

    我需要一些帮助来尝试使用命令行运行以下 Maven 项目 https github com sarxos webcam capture https github com sarxos webcam capture webcam captur
  • 在 Graal.js 中使用 java 类

    使用 Graal js 如何将 java 类导入到 JS 脚本中 以下代码适用于 Nashorn JJS 但不适用于 Graal js 因为没有Java type 在graal中 我需要在某个时候调用truffle吗 var ArrayLi
  • Preg_match PHP 到 java 的翻译

    我在将 php preg match 转换为 java 时遇到一些问题 我以为我的一切都是正确的 但它似乎不起作用 这是代码 原始PHP Pattern for 44 Character UUID pattern 0 9A F 44 if
  • 使用电子邮件、用户名和密码进行 Firebase 身份验证

    我想知道是否可以使用电子邮件和用户 ID 密码登录 我有一个项目 我希望用户添加一个唯一的号码 实际上是我们公司提供的工作识别号码 以便能够签名参与该计划的人员将继续留在公司就业 即使电子邮件和密码正确但用户 ID 错误 我也需要 fire
  • IntelliJ IDEA:忽略代码覆盖率中的琐碎方法

    在 IntelliJ IDEA 15 0 2 中 如何在测试覆盖率测量期间忽略琐碎的 getter 和 setter 琐碎方法 should be measure public void complex fancy interesting
  • 使用 Arrays.copyOf 复制不同类型的数组时出现问题

    我正在尝试创建一个方法 该方法几乎将任何内容作为参数 并返回带有某些分隔符的值的串联字符串表示形式 public static String getConcatenated char delim Object names String st
  • 注意通知持续时间

    是否可以将抬头通知的持续时间设置为无限 现在它只显示 5 秒 已经尝试过不同的事情 例如更改类别 但持续时间始终为 5 秒 这是我的代码 Notification notification notificationBuilder setCa
  • 我们还需要迭代器设计模式吗? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • javaFX,抛出 NullPointerException,位置是必需的

    我看过其他答案 但没有任何帮助我 抱歉 GUI新手只知道swing的基础知识 这是主课 package application import javafx application Application import javafx fxml
  • 数组所有可能的组合

    我有一个字符串数组 ted williams golden voice radio 我希望这些关键字的所有可能组合采用以下形式 ted williams golden voice radio ted williams ted golden

随机推荐

  • 将嵌套字典转换为 IReadOnlyDictionary

    我正在尝试给出一个IReadOnly 内部参考Collection对象 这在大多数情况下效果很好 但如果我想将包含集合的字典转换为IReadOnlyDictionary含有一个IReadOnlyCollection 这是一个代码示例 var
  • Python Outlook 获取来自发件人的所有电子邮件

    我正在尝试使用 python 来浏览 Outlook 并获取发件人的所有电子邮件 我已经看过但不知道如何做到这一点 我可以按主题获取电子邮件并返回发件人 但我希望获取所有发件人然后返回主题 这就是我用来按主题获取发件人的方法 import
  • 如何从浏览器保存 .pdf?

    我尝试使用在 stackoverflow 上找到的不同方法来保存 pdf 文件 包括FileUtils IO然而 我总是会把它弄坏 当我使用记事本打开损坏的文件时 我得到以下内容
  • 将gridview转换为图像

    我想将 gridview 转换为图像并将其保存到 asp net 3 5 中的本地磁盘 我不知道该怎么做 任何人都可以建议一些东西 提前致谢 以下链接可能对您有帮助 将 DataGridView 转换为位图 使用 Reporting Ser
  • 点击时设置 PHP cookie

    所以我宁愿不使用 JS jQuery 但我似乎无法让它工作 我有一个链接 a href hideupdates hide Hide Updates a 我正在尝试用它来设置 cookie if GET hideupdates hide se
  • 将新值写入数组

    这是我的代码片段 typedef float point2 2 point2 a 90 90 point2 b 90 90 point2 c 90 90 point2 d 90 90 glBegin GL POLYGON glVertex2
  • SQL Developer 不显示 dbms_output

    我正在尝试在 SQL Developer 中编写 PL SQL 代码 但它没有显示输出 我的程序编译成功 这是代码 set serveroutput on declare begin dbms output put line Hi end
  • 使用条件 ?:(三元)运算符的好处

    与标准 if else 语句相比 运算符有哪些优点和缺点 显而易见的是 条件 运算符 处理直接值比较和赋值时更短 更简洁 似乎不像 if else 结构那么灵活 标准如果 否则 可以应用到更多的情况 比如函数调用 通常是不必要的长 每个语句
  • 如何使用 DotNetOpenAuth 检索 google 个人资料?

    我正在尝试使用 DNOA 为我的应用程序提供 OpenId 支持 以便离开我迄今为止一直使用的 Janrain 解决方案 问题是 到目前为止 我拥有的用户拥有基于个人资料的标识符 https www google com profiles
  • 从互联网下载 HTML 后字符串中的字符发生变化

    使用以下代码 我可以从互联网下载文件的 HTML WebClient wc new WebClient string downloadedFile wc DownloadString http www myurl com 但是 有时该文件包
  • Web API 版本控制配置

    我是Mvc新手 尝试编写restful api 我使用Web api类型的应用程序 并尝试创建版本控制 最后我想要像 api v1 values get api v2 values get 这样的链接类型 我尝试在控制器文件夹中创建文件夹
  • 如何获取网站的加载时间和访问量?

    我正在开发一个程序 该程序应该测量我作为输入提供的网站的加载时间和容量 这里我有一些代码只返回网站的响应时间 但我想要总加载时间和项目的总体积 图片 JavaScript HTML 等 public string Loading Time
  • 通过另一个字段选择具有最大日期顺序的数据[重复]

    这个问题在这里已经有答案了 我已经在 SQL 数据库中创建了这个表 index Reg No Payment Payday 1 S001 100 2017 01 01 2 S001 500 2017 02 01 3 S002 400 201
  • 从字母数字字符中删除数字

    我有一个字母数字字符列表 如下所示 x lt c ACO2 BCKDHB456 CD444 我想要以下输出 x lt c ACO BCKDHB CD 您可以使用gsub为了这 gsub digit x or gsub 0 9 x 1 ACO
  • 按自定义顺序对 Laravel Collection 进行排序,而不是 asc 或 desc

    我有这个数组作为 Laravel Collection data id 863368 reference Ref 1 status 1 id 863391 reference Ref 2 status 2 id 863390 referen
  • 当请求是文本时,如何在 wso2 esb foreach 中介器上添加表达式

    我使用了 wso2 foreach 调解器 当我的请求是 Json 时 我添加 data 作为表达式 foreach表达式 数据 我的json请求是这样的 data id 1 name abc id 2 name efg 这个场景效果很好
  • 将 ShortInt 数组转换为字符串,Delphi

    我正在按照我学到的方式做 那就是 用FOR并一一获取索引数组 但它离开太慢了 否则会将其转换为字符串吗 离开得更快吗 就我而言 它将是 ShortInt 的动态数组 例如 给定以下输入 0 20 15 我想要以下输出 0 20 15 我怀疑
  • 将英国格式日期转换为日期时间

    我在 SQL 中有一个代表日期的值 但它的类型为 nvarchar 日期值的格式为 dd mm yyyy hh mm 我需要通过 CCure 800 数据库的视图来呈现本专栏DATETIME格式 我期望使用CAST CONVERT 但是 当
  • 与 Rugged 执行“快进”合并

    使用 Rugged 执行快进 合并 的规范方法是什么 From here我发现了一个可能的线索 Move branch forward Since there s no fast forward merge in this lib yet
  • 将消费者偏移量重置为 Kafka Streams 的开头

    我正在使用 Kafka 流 并且想要将一些消费者偏移量从 Java 重置到开头 KafkaConsumer seekToBeginning 听起来是正确的做法 但我使用 Kafka Streams KafkaStreams streams