从 RDD 访问 KafkaOffset 时出现异常

2023-11-29

我有一个来自 Kafka 的 Spark 消费者。 我正在尝试管理一次性语义的偏移量。

但是,在访问偏移量时,它会引发以下异常:

“java.lang.ClassCastException:org.apache.spark.rdd.MapPartitionsRDD 无法转换为 org.apache.spark.streaming.kafka.HasOffsetRanges”

执行此操作的代码部分如下:

var offsetRanges = Array[OffsetRange]()
dataStream
  .transform { 
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
   }
   .foreachRDD(rdd => { })

这里的 dataStream 是使用 KafkaUtils API 创建的直接流(DStream[String]),如下所示:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

如果有人可以帮助我理解我在这里做错了什么。 正如官方文档中提到的,transform 是对数据流执行的方法链中的第一个方法

Thanks.


你的问题是:

.map(._2)

这创建了一个MapPartitionedDStream而不是DirectKafkaInputDStream由...制作KafkaUtils.createKafkaStream.

你需要map after transform:

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t))

kafkaStream
  .transform { 
    rdd => 
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
  }
  .map(_._2)
  .foreachRDD(rdd => // stuff)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 RDD 访问 KafkaOffset 时出现异常 的相关文章

随机推荐