Pyspark结构化流Kafka配置错误

2024-02-02

我之前已经成功地将 pyspark 用于 Spark Streaming (Spark 2.0.2) 和 Kafka (0.10.1.0),但我的目的更适合结构化流。我尝试使用在线示例:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

使用以下类似代码:

ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
query = ds1
  .writeStream
  .outputMode('append')
  .format('console')
  .start()
query.awaitTermination() 

但是,我总是会遇到以下错误:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value

我还尝试在创建 ds1 时将其添加到我的选项集中:

.option("partition.assignment.strategy", "range")

但即使显式地为其分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也不能阻止错误。

我还尝试使用“分配”选项,并出现了相同的错误(我们的 Kafka 主机设置为分配 - 每个消费者仅分配一个分区,并且我们没有任何重新平衡)。

知道这是怎么回事吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,有没有办法使用 KafkaUtils 进行结构化流处理?或者这是唯一的网关?


  1. Kafka 0.10.1.* 客户端存在一个已知问题,您不应该将其与 Spark 一起使用,因为它可能会由于以下原因生成错误的答案https://issues.apache.org/jira/browse/KAFKA-4547 https://issues.apache.org/jira/browse/KAFKA-4547。您可以使用 0.10.0.1 客户端,它应该与 0.10.1.* Kafka 集群一起使用。

  2. 要将 Kafka 配置发送到结构化流中的 Kafka 消费者客户端,您需要添加kafka.前缀,例如.option("kafka.partition.assignment.strategy", "range")。但是,您不需要设置kafka.partition.assignment.strategy因为它有一个默认值。我的预感是您可能将 Kafka 0.8.* 和 0.10.* jar 放在类路径上并加载错误的类。

  3. 您想使用 KafkaUtils 中的哪个 API,但在结构化流中缺失? Spark 2.2.0 刚刚推出,您可以在结构化流中使用 Kafka 的批处理或流式查询。读http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html举些例子。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pyspark结构化流Kafka配置错误 的相关文章