我正在使用 Spark 结构化流从 Kafka 队列中读取数据。读完卡夫卡后我正在申请filter
on the dataframe
。我正在将这个过滤后的数据帧保存到镶木地板文件中。这会生成许多空镶木地板文件。有什么办法可以停止写入空文件吗?
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
decompDF = Transaction_DF.select(zip_extract("value").alias("decompress"))
filterDF = decomDF.filter(.....)
query = filterDF .writeStream \
.option("path", outputpath) \
.option("checkpointLocation", RawXMLCheckpoint) \
.start()
有什么办法可以停止写入空文件。
是的,但你宁愿not do it.
许多空 parquet 文件的原因是 Spark SQL(结构化流的底层基础设施)尝试猜测加载数据集的分区数量(每批来自 Kafka 的记录),并且执行得“很差”,即许多分区没有数据。
当您保存没有数据的分区时,您将得到一个空文件。
您可以使用repartition
or coalesce
操作员设置适当的分区数量并减少(甚至完全避免)空文件。看数据集API.
你为什么要not do it? repartition
and coalesce
由于在分区(以及可能是 Spark 集群中的节点)之间重新整理数据的额外步骤,可能会导致性能下降。这可能很昂贵并且不值得这样做(因此我说你宁愿不这样做)。
然后您可能会问自己,如何知道正确的分区数量?这是一个非常好的问题any星火项目。答案相当简单(如果您了解 Spark 处理什么以及如何处理,那么答案就很明显):“了解您的数据”,以便您可以计算有多少数据是完全正确的。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)