如果我不知道直到偏移量,我可以从 kafka 主题创建 RDD 吗?

2023-11-27

KafkaUtils.createRDD将 offsetRanges 作为参数。我不知道我想读取的主题的直到偏移量。我最多想阅读该主题中的前 30 条消息。

我看到有一个KafkaCluster.html#getLatestLeaderOffsets但这被注释为开发 API。

是否有任何公开方法可以确定某个主题的最早和最新偏移量?


这并不是那么简单的事情,因为只有各个代理知道给定主题/分区的最新偏移量信息是什么。

你可以做一个OffsetRequest。下面将返回主题/分区的最早和最新偏移量(它是 Scala,但如果您不使用 Scala,您应该能够理解)。

请注意,您必须使用SimpleConsumer连接到作为所请求分区的领导者的代理。通常我所做的是,我创建一个SimpleConsumer对于我的每一位经纪人。然后我执行元数据请求并获取分区到领导者的映射,然后对于每个分区我执行以下操作:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
  val time = kafka.api.OffsetRequest.LatestTime
  val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)))
  val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq")
  val resp = consumer.getOffsetsBefore(req)
  val offsets = resp.offsets(topic, partition)
  if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0))
  else (0, -1)
}

希望这可以帮助。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如果我不知道直到偏移量,我可以从 kafka 主题创建 RDD 吗? 的相关文章

随机推荐