Spark Streaming Kafka 流

2023-12-28

我在尝试使用 Spark Streaming 读取 kafka 时遇到一些问题。

我的代码是:

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))

val kafkaParams = Map[String, String](
  "zookeeper.connect" -> "localhost:2181",
  "group.id" -> "consumergroup",
  "metadata.broker.list" -> "localhost:9092",
  "zookeeper.connection.timeout.ms" -> "10000"
  //"kafka.auto.offset.reset" -> "smallest"
)

val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

我之前在端口 2181 启动了 Zookeeper,在端口 9092 启动了 Kafka 服务器 0.9.0.0。 但我在 Spark 驱动程序中收到以下错误:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)

动物园管理员日志:

[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

有什么提示吗?

非常感谢


该问题与错误的 Spark-streaming-kafka 版本有关。

如中所述文档 http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources

Kafka:Spark Streaming 1.5.2 兼容 Kafka 0.8.2.1

所以,包括

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.2</version>
</dependency>

在我的 pom.xml (而不是版本 0.9.0.0)中解决了这个问题。

希望这可以帮助

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming Kafka 流 的相关文章

随机推荐

  • 批量:将txt文件中的文件复制到一个文件夹中

    我正在尝试创建一个批处理文件 将文本文件中列出的多个文件复制到新文件夹中 我发现了几个与此相关的线程 但我仍然无法使批处理正常工作 我遇到的问题是txt中列出的文件都位于不同的源位置并且具有不同的扩展名 例如 该列表如下 C Users F
  • CSS边距重叠而不是给出距离[重复]

    这个问题在这里已经有答案了 最近我遇到了边距问题 但我无法解决它 我的 HTML 看起来像这样 div class info box Some text div div class form div CSS info box border
  • 从 Linux 调试 Windows 内核

    我曾经使用调试 Windows 内核虚拟KD WinDBG和一个虚拟机 最近我得到了一台Linux机器 现在我想知道当你的主机无法运行时调试Windows内核的最简单方法是什么虚拟KD WinDBG 我假设该解决方案需要两个虚拟机 但我宁愿
  • Apache Camel 与 IBM MQ

    大家好 有谁曾经将 Camel 与 IBM 的 MQ 一起使用过吗 我们正在考虑将这两种产品一起使用的可能性 但没有这两种产品一起工作的示例 我广泛使用 IBM MQ 和 Camel 两者一起使用没有问题 我将从我的一个 Spring 上下
  • vtk.vtkRender() 导致段错误:11

    我有一个 python 脚本 它不是我编写的 它利用了 vtk 模块 它可以在我的旧 iMac 和 Linux 机器上运行 在安装了 OS X Mavericks 的 Macbook Pro 上尝试时 我在渲染过程中遇到了段错误 我已经追踪
  • 从我的笔记本电脑连接到 AWS MSK Kafka:NoBrokersAvailable

    我在连接到 AWS MSK Kafka 时遇到问题 使用 with访问控制方式配置为None 从我的笔记本电脑 错误是 没有可用的经纪人 我在用纯文本通过端口连接9092 from kafka import KafkaProducer ka
  • 如何检测三星互联网浏览器的深色模式?

    如果可以检测到这一点 是否有可能通过 CSS 类或 JavaScript 影响颜色 使用深色阅读器扩展 还可以提交调整 如果没有其他办法可以改变的话 希望这里也是可能的 三星互联网确实有颜色的奇怪的东西 https www ctrl blo
  • Django Rest 框架:如果未对可浏览 API 进行身份验证,则进行重定向

    我不确定如果用户不满足权限标准 如何实现重定向登录 正如文档所述 在运行视图主体之前 会检查列表中的每个权限 所以视图中的重写方法是没有结果的 如果有人能指出我正确的方向吗 谢谢 Django Rest Framework 是一个用于构建
  • 散景字形坐标与 x_axis_type 'datetime'

    我正在尝试将一个简单的文本字符串 字形 添加到使用的散景图中x axis type datetime 我的代码 精简到其要点 如下 p figure plot width 900 plot height 380 x axis type da
  • 反应酶测试库

    我是反应测试库的新手 我热衷于使用酶 当我浏览文档时 我发现 create react app 允许直接包含第三方测试库 我尝试了以下方法来添加酶 但没有成功 这是我尝试过的 但它在我的 app test js 中不起作用 import R
  • Scrapy:如何调试scrapy丢失的请求

    我有一个 scrapy 蜘蛛 但有时它不返回请求 我发现通过在产生请求之前和获得响应之后添加日志消息 Spider 可以迭代页面并解析每个页面上的项目废弃的链接 这是代码的一部分 SampleSpider BaseSpider def pa
  • 如何在java中使用中文和日文字符作为字符串?

    Hi我正在使用java语言 在此我必须使用一些中文 日文字符作为字符串并使用 System out println 进行打印 我怎样才能做到这一点 Thanks Java Strings支持Unicode 所以中文和日文都没问题 不过 其他
  • 或者不是有效的 C++:为什么这段代码可以编译?

    这是我用 QtCreator 制作的一个非常简单的 C 应用程序 int main int argc char argv int a 1 int b 2 if a lt 1 or b gt 3 return 1 return 0 对我来说
  • Eigen 将旋转和平移组合成一个矩阵

    我有一个旋转矩阵rot Eigen Matrix3d 和平移向量transl Eigen Vector3d 我希望它们一起出现在 4x4 变换矩阵中 我只是为了我的生活不知道如何在 Eigen 中做到这一点 我认为仿射可以以某种方式使用 但
  • Android 中的 Z 索引?

    我在一个 xml 中有多个元素 listview slidingdrawer edittext 和 button 我想滑动抽屉顺序始终位于另一个元素的前面 但我不能 这是我的 xml
  • 与复合组件一起使用时出现重复 ID 异常

    有条件地使用复合组件时 出现重复 ID 异常
  • SwingUtilities.invokeLater()

    我如何感受到 SwingUtilities invokeLater 在任何 swing 应用程序中的重要性 请给出一些代码示例 每当你需要更新 GUI 中的某些内容时 你应该通过AWT 事件线程 这是因为 AWT 以及顶部的 Swing 有
  • 在使用 XP 模式的 Windows 7 上构建 Visual Studio 2010 Silverlight 4 项目时出错

    我在 Windows 7 上的 XP 模式 VM 中安装了 Visual Studio 2010 Beta 2 然后我创建了一个简单的 Silverlight 4 测试版 项目并尝试构建它 我收到以下错误 错误 1 ValidateXaml
  • Spring Security,JUnit:@WithUserDetails 用于在 @Before 中创建的用户

    在使用 Spring MockMVC 的 JUnit 测试中 有两种方法用于验证 Spring Security 用户的身份 WithMockUser使用提供的凭据创建一个虚拟用户 WithUserDetails获取用户名并将其解析为正确的
  • Spark Streaming Kafka 流

    我在尝试使用 Spark Streaming 读取 kafka 时遇到一些问题 我的代码是 val sparkConf new SparkConf setMaster local 2 setAppName KafkaIngestor val