Spark Streaming kafka 偏移量管理

2024-02-15

我一直在做 Spark Streaming 工作,通过 kafka 消费和生成数据。我使用的是directDstream,所以我必须自己管理偏移量,我们采用redis来写入和读取偏移量。现在有一个问题,当我启动我的客户端时,我的客户端需要从redis获取偏移量,而不是kafka中存在的偏移量本身。如何显示我编写我的代码?现在我已经在下面编写了我的代码:

   kafka_stream = KafkaUtils.createDirectStream(
    ssc,
    topics=[config.CONSUME_TOPIC, ],
    kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
                 "auto.offset.reset": "largest"},
    fromOffsets=read_offset_range(config.OFFSET_KEY))

但我认为 fromOffsets 是 Spark-streaming 客户端启动时的值(来自 Redis),而不是运行期间的值。谢谢您的帮助。


如果我理解正确的话,您需要手动设置偏移量。我就是这样做的:

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import TopicAndPartition

stream = StreamingContext(sc, 120) # 120 second window

kafkaParams = {"metadata.broker.list":"1:667,2:6667,3:6667"}
kafkaParams["auto.offset.reset"] = "smallest"
kafkaParams["enable.auto.commit"] = "false"

topic = "xyz"
topicPartion = TopicAndPartition(topic, 0)
fromOffset = {topicPartion: long(PUT NUMERIC OFFSET HERE)}

kafka_stream = KafkaUtils.createDirectStream(stream, [topic], kafkaParams, fromOffsets = fromOffset)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming kafka 偏移量管理 的相关文章

随机推荐

  • 如何在 Rust 2018 中为 crate 指定别名?

    我有一个箱子foo sys 在 Rust 2015 中我使用了extern crate foo sys as foo为了方便起见 但在 Rust 2018 中extern crate不再需要 我不想仅将它用于别名 掉落时extern cra
  • Tkinter 变量跟踪方法回调的参数是什么?

    Python 有 Tkinter 变量的类StringVar BooleanVar 等等这些都共享方法get set string and trace mode callback The callback函数作为第二个参数传递给trace
  • 如何使视频的宽度为100%或高度为100%

    我有同样的问题this https stackoverflow com questions 27328009 100 height or 100 width但我正在尝试这样做
  • 是否可以将单个文件从一个 SVN 存储库迁移到另一个存储库,同时保留历史记录?

    我在我的个人 SVN 存储库中启动了一个小应用程序 它引起了足够的兴趣 值得将开发转移到共享组存储库 可以将应用程序 仅单个文件 及其历史记录迁移到组存储库吗 查看 svnadmin dump svnadmin load 和 svndump
  • Saxon 与 Visual Studio 2010 - 有没有办法使用调试器?

    我们花费了越来越多的时间来应对 Microsoft XSLT 1 0 处理器的限制 我认为现在是开始使用 XSLT 2 0 的时候了 撒克逊似乎是一个不错的选择 问题是 Visual Studio 集成的 XSLT 调试器非常棒 没有人想停
  • NSString stringWithFormat 使用制表符而不是空格

    您可以看到我在字符串后添加了 40 个空格 但它从索引 0 开始 我可以用制表符 t 而不是空格 做同样的事情吗 NSString firstString NSString stringWithFormat stringToWrite st
  • QML 中的 if 语句

    对于 QT 和 QML 来说是全新的 我试图根据两个属性双精度之间的关系设置矩形的颜色callValue and handRaiseXBB 但我收到错误 意外的标记如果 and 需要一个限定名称 id 谁能告诉我我做错了什么 import
  • Arduino:使用串口和软件串口与蓝牙模块

    我的目的是使用 Arduino 通过 HC 05 蓝牙模块在 PC 和 Android 设备之间建立通信 我使用 PC 和 Arduino 串行监视器 之间的 USB 通信以及 SoftwareSerial 来连接到 HC 05 我的问题是
  • 如何在 PHP 中模拟单精度浮点运算?

    我需要将一个简单的 C 程序移植到 PHP 目前我们必须启动该过程并解析其输出 程序很琐碎 但是对于算法的使用很重要float因为错误会累积起来 结果会大相径庭 C 示例 include
  • 强制 jQuery UI Datepicker 显示在输入字段下方

    我有一个页面 其中包含一个表单和几个字段 我的问题如下 我试图强制 jQuery UI 日期选择器显示在输入字段下方 当我单击输入字段时 我希望该字段也滚动到页面顶部 我想我有这个工作 这是我的 jQuery JQUERY document
  • 如何使用 getMouse() 捕获右键单击事件

    我正在尝试使用graphics py 编写用户图形界面 问题是如何捕获右键事件呢 看来函数 getMouse 只能返回鼠标左键单击的位置作为 Point 对象 from graphics import def main win GraphW
  • 使用 servlet 将动态图像传递到 JSP

    我有一个桌面应用程序 可以创建图形 2D 对象 将其粘贴在面板中并进行绘制 我正在尝试使用 servlet 和 jsp 将此应用程序转换为网页 我已经在网上阅读了两天 但无法理解如何做到这一点 我发现的每个例子似乎都遗漏了一个重要的部分 我
  • Teamcity 无法通过 nuget 安装软件包

    已解决 请参阅评论 我在 team city 有一个项目无法安装所需的软件包 这已经工作了一年多 但本周所有构建都开始失败 我的开发流程如下 我们使用github并使用Visual Studio进行本地开发 我们有一个 Windows te
  • 使用对象过滤嵌套数组

    我有一系列类别 每个类别实例都有优惠属性 class Category var offers Offer var title String var id Int class Offer var type String global vari
  • 从 eclipse 运行 Visual VM

    我正在尝试在 Eclipse kepler 中使用 Visual VM 调试 java 应用程序 我正在执行具有选择 Visual VM 作为启动器的主要方法的类 我已经按照中给出的说明配置了 Visual VMhttp blog idrs
  • Chrome 中的 CSS3 过渡不平滑

    我使用 CSS3 过渡根据鼠标悬停时的边距值对某些链接进行动画处理 它的动画效果符合预期 但 Chrome 中的动画并不像其他浏览器 如 Firefox IE10 那样流畅 在 Chrome 中 当我将鼠标悬停在链接上时 所有其他链接都会稍
  • 字符串文字和数组的地址

    int main char str1 Hi str2 Bye printf u u n str1 str1 int arr 5 1 2 3 4 5 printf u u arr arr 这里发生了什么 str and str给出不同的地址和
  • 如何修复 webkit-fake-url 的含义?

    当您尝试复制并粘贴到 Safari Web 浏览器时 浏览器会插入webkit fake url 在 Chrome 中 你可以这样做getAsFile 在剪贴板数据上读取图像 你能用 Safari 做类似的事情吗 不可以 目前无法在 Saf
  • 将 Azure Active Directory 用户添加到 Azure SQL 数据库

    我有一个 Azure SQL Server 可以通过 SSMS 进入其中 我还有一个 Azure Active Directory 其用户名为 电子邮件受保护 cdn cgi l email protection 我想添加此用户以拥有对我的
  • Spark Streaming kafka 偏移量管理

    我一直在做 Spark Streaming 工作 通过 kafka 消费和生成数据 我使用的是directDstream 所以我必须自己管理偏移量 我们采用redis来写入和读取偏移量 现在有一个问题 当我启动我的客户端时 我的客户端需要从