使用Spark结构化流读取Kafka数据总是出现超时问题

2023-12-26

这是我使用 Spark Structured Streaming 从 Kafka 读取数据的代码,

//ss:SparkSession is defined before. 
import ss.implicits._
val df = ss
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_server)
  .option("subscribe", topic_input)
  .option("startingOffsets", "latest")
  .option("kafkaConsumer.pollTimeoutMs", "5000")
  .option("failOnDataLoss", "false")
  .load()

这是错误代码,

  Caused by: java.util.concurrent.TimeoutException: Cannot fetch record xxxx for offset in 5000 milliseconds

如果我把5000放大到10000,这个错误仍然发生。 我通过谷歌搜索了这个问题。似乎没有太多关于这个问题的相关信息。

这是 sbt 文件中与此问题相关的部分。

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0" exclude ("org.apache.kafka", "kafka-clients")
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"

我也遇到这个错误。

我查看了KafkaSourceRDD的源代码,一无所获。

我猜 kafka 连接器有问题,因此我排除了“spark-sql-kafka-0-10_2.11”包中的 kafka-client,并添加了一个新的依赖项,如下所示:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.3.0</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <artifactId>kafka-clients</artifactId>
                <groupId>org.apache.kafka</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>

现在可以了。希望能帮助到你。

我创建了一个 jira 问题来报告此问题:https://issues.apache.org/jira/browse/SPARK-23829 https://issues.apache.org/jira/browse/SPARK-23829

2018年12月17日更新:Spark 2.4和Kafka2.0解决了该问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用Spark结构化流读取Kafka数据总是出现超时问题 的相关文章

随机推荐