Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时抛出错误

2024-01-08

我目前正在 Dataproc 上运行 Spark 作业,在尝试重新加入组并从 kafka 主题读取数据时遇到错误。我做了一些挖掘,但不确定问题是什么。我有auto.offset.reset set to earliest所以它应该从最早可用的非提交偏移量中读取,最初我的火花日志如下所示:

19/04/29 16:30:30 INFO     
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-11 to offset 5553330.
19/04/29 16:30:30 INFO     
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-2 to offset 5555553.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-3 to offset 5555484.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-4 to offset 5555586.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-5 to offset 5555502.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-6 to offset 5555561.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-7 to offset 5555542.```

但是接下来的下一行我尝试从服务器上不存在的偏移量读取时遇到错误(您可以看到分区的偏移量与上面列出的偏移量不同,所以我不知道为什么它会尝试读取表单该偏移量,这是下一行的错误:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
out of range with no configured reset policy for partitions: 
{demo.topic-11=4544296}

有什么想法可以解释为什么我的 Spark 工作不断回到这个偏移量(4544296),而不是它最初输出的偏移量(5553330)?

这似乎是自相矛盾的 w a) 它所说的实际偏移量和它尝试读取的偏移量 b) 说没有配置重置策略


这个答案迟了一年,但希望能帮助其他面临类似问题的人。

通常,当消费者尝试读取 Kafka 主题中不再存在的偏移量时,就会出现此行为。偏移量不再存在,通常是因为它已被 Kafka Cleaner 删除(例如由于保留或压缩策略)。然而,消费者组仍然是 Kafka 已知的,并且 Kafka 保留了主题“demo.topic”及其所有分区的组“demo-group”的最新消费消息的信息。

因此,auto.offset.reset配置不会有任何影响,因为不需要重置。相反,卡夫卡了解消费者组。

除此之外Fetcher只告诉您主题的每个分区内最新的可用偏移量。确实如此not自动意味着它实际上轮询直到此偏移量的所有消息。 Spark 决定每个分区实际消耗和处理多少消息(基于例如配置maxRatePerPartition).

要解决此问题,您可以更改消费者组(在这种特殊情况下这可能不是您想要的),或者通过使用手动重置消费者组“演示组”的偏移量

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group demo-group --topic demo.topic --partition 11 --to-latest

根据您的要求,您可以使用该工具重置主题每个分区的偏移量。帮助功能或文档解释了所有可用选项。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时抛出错误 的相关文章

随机推荐

  • 用户位置的自定义注释视图不移动地图视图

    我们可以在 iOS 中为用户当前位置提供自定义注释视图吗 我需要用我自己的自定义视图 比如一些 ping 引脚 删除蓝点 带圆圈 是否有可能做到这一点 如果我们这样做 当用户位置发生变化时 该图钉是否会移动到新位置 或者我们需要以编程方式处
  • node.js websocket 模块已安装,但无法在脚本中运行

    我刚刚安装了node js microsoft Visual 以便能够安装websocket 它安装得很好 C Users Administrator gt npm install websocket npm http GET https
  • 为 linq groupby 编写自定义比较器

    同样 这个示例是我的实际问题的一个非常简化的版本 涉及 linq 分组的自定义比较器 我做错了什么 下面的代码产生下面的结果 1 2 0 4 1 0 4 1 0 1 1 0 然而我期待以下结果 因为 1 1 和 1 2 之间的距离 clas
  • 改变这是什么

    有没有办法改变 THIS 指向的内容 class foo foo fooinstance new foo foo otherfooinstance new foo void foo bar this otherfooinstance foo
  • 错误号2058无法加载插件authentication_windows_client:找不到指定的模块

    MySQL 有一个插件 允许根据当前用户的 Windows 凭据进行用户身份验证 该插件是 authentication windows dll 我从 SQLyog 收到以下错误消息 错误号 2058 插件authentication wi
  • 更改 Flash 播放器音频输出设备

    有没有办法改变Flash播放器的音频输出设备 如果没有的话 有没有swf播放器有这种可能性 谢谢 直到几分钟前我才遇到一个关于此的问题 我的 XP 盒子有两个音频设备 一个 iMic USB 音频 I O 设备 我已将桌面扬声器永久插入其中
  • 如何从反应应用程序中的公共文件夹导入文件?

    我在 public 文件夹中有一个 javascript 文件 我想将该文件导入到文件夹 src components 中的组件 projectFolder publicFolder index html recorder js srcFo
  • eclipse 从 root 显示 README

    以下项目结构并不罕见 项目A 目录 项目B 目录 ProjectX 目录 变更日志 文件 许可证 文件 自述文件 文件 这种结构 README 位于根目录中 得到了不同在线 Git 解决方案 如 github com bitbucket o
  • MDX - NON EMPTY 函数更快?

    我当时的假设是NON EMPTY必须尽可能避免使用该子句 因此 当我意外地发现它实际上使查询速度更快时 我感到震惊 示例如下 select Measures Count Of Requests on 0 Client Client Numb
  • Laravel - 与软删除数据的隐式路由模型绑定

    我有一个小问题 有两种用户角色 一种是普通成员 一种是管理员 成员可以删除博客 并且在删除 软删除 博客后他们将无法看到该博客 而管理员仍然可以看到该博客 即使它是软删除的 示例代码 Route file Route get blog bl
  • AngularJS 指令链接函数未被调用

    我正在尝试将 Angular http auth 库与引导模式窗口一起使用 模态框工作正常 但我在指令方面遇到问题 这是一个 jsfiddle 链接 http jsfiddle net jCUSh 85 http jsfiddle net
  • Android - 从Webview调用Java

    我想从Webview调用Java I have JavaScriptInterface below class JavaScriptInterface private Activity activity public JavaScriptI
  • Rails 4.0.1 中的新记录“没有将符号显式转换为字符串”(仅限)

    在我升级 Rails 4 后 尝试为我的任何 ActiveRecord 类创建新记录会给出 No explicit conversion of Symbol into String 例如 这是我的 links links params 方法
  • 在 FLASK 中运行 pypupeteer 会出现 ValueError: signal only Works in main thread

    我正在尝试将 pyppeteer 集成到 Flask 应用程序中 我有一个运行 pyppeteer 并截取页面屏幕截图的 python 脚本 如果我单独运行该脚本 这是工作文件 The PROBLEM当我在 FLASK 应用程序中运行它时
  • c++ - 不命名类型

    我有一个问题 当我尝试构建以下代码时 我得到 keywords does not name a type whitespace does not name a type 第 18 19 行和第 22 24 行 有人可以帮忙吗 这是代码 cp
  • 我如何解释这个输入?

    我目前使用 ANTLR 在 Java 中实现了一种可用的 简单的语言 我想做的是将其嵌入纯文本中 与 PHP 类似 例如 Lorem ipsum dolor sit amet Phasellus volutpat dignissim sap
  • Woocommerce/Wordpress - 将用户登录重定向到主页

    我已经搜索了这个问题的答案 使用了插件 但仍然没有任何效果 我希望我的网站的用户在登录 注册后被重定向到主页 目前 用户登录并被重定向到我的帐户页面 Woocommerce 提供了此代码 但它对我不起作用 goes in theme fun
  • 如何减少/消除 Angular 应用程序中的内存泄漏

    我正在优化我的大Angular App 当我发现一个Google DevTools非常好发现问题 由于我刚刚开始学习DevTools 我对内存泄漏很困惑 当我在应用程序中的不同页面之间来回移动时 配置文件堆快照大小一次又一次地增加 因此我认
  • 如何在 Java 中为 Swing 组件设置字体粗细

    我想设置不同字体粗细我的 JFrame 对话框上的组件 我该怎么做呢 在下面的Java语句中 setFont new Font Dialog Font BOLD 12 当我使用 Font BOLD 时 它太粗体 当我使用 Font Plai
  • Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时抛出错误

    我目前正在 Dataproc 上运行 Spark 作业 在尝试重新加入组并从 kafka 主题读取数据时遇到错误 我做了一些挖掘 但不确定问题是什么 我有auto offset reset set to earliest所以它应该从最早可用