我之前已经成功地将 pyspark 用于 Spark Streaming (Spark 2.0.2) 和 Kafka (0.10.1.0),但我的目的更适合结构化流。我尝试使用在线示例:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html
使用以下类似代码:
ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
query = ds1
.writeStream
.outputMode('append')
.format('console')
.start()
query.awaitTermination()
但是,我总是会遇到以下错误:
: org.apache.kafka.common.config.ConfigException:
Missing required configuration "partition.assignment.strategy" which has no default value
我还尝试在创建 ds1 时将其添加到我的选项集中:
.option("partition.assignment.strategy", "range")
但即使显式地为其分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也不能阻止错误。
我还尝试使用“分配”选项,并出现了相同的错误(我们的 Kafka 主机设置为分配 - 每个消费者仅分配一个分区,并且我们没有任何重新平衡)。
知道这是怎么回事吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,有没有办法使用 KafkaUtils 进行结构化流处理?或者这是唯一的网关?