我一直在做 Spark Streaming 工作,通过 kafka 消费和生成数据。我使用的是directDstream,所以我必须自己管理偏移量,我们采用redis来写入和读取偏移量。现在有一个问题,当我启动我的客户端时,我的客户端需要从redis获取偏移量,而不是kafka中存在的偏移量本身。如何显示我编写我的代码?现在我已经在下面编写了我的代码:
kafka_stream = KafkaUtils.createDirectStream(
ssc,
topics=[config.CONSUME_TOPIC, ],
kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
"auto.offset.reset": "largest"},
fromOffsets=read_offset_range(config.OFFSET_KEY))
但我认为 fromOffsets 是 Spark-streaming 客户端启动时的值(来自 Redis),而不是运行期间的值。谢谢您的帮助。
如果我理解正确的话,您需要手动设置偏移量。我就是这样做的:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import TopicAndPartition
stream = StreamingContext(sc, 120) # 120 second window
kafkaParams = {"metadata.broker.list":"1:667,2:6667,3:6667"}
kafkaParams["auto.offset.reset"] = "smallest"
kafkaParams["enable.auto.commit"] = "false"
topic = "xyz"
topicPartion = TopicAndPartition(topic, 0)
fromOffset = {topicPartion: long(PUT NUMERIC OFFSET HERE)}
kafka_stream = KafkaUtils.createDirectStream(stream, [topic], kafkaParams, fromOffsets = fromOffset)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)