这是我面临的问题的缩影,我遇到了错误。让我尝试在这里重现它。
我正在保存一个DataFrame
as a parquet
,但是当我重新加载时DataFrame
from parquet
文件并再次保存为parquet
,我收到错误。
valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')]
df = spark.createDataFrame(valuesCol,['sex','date'])
# Save as parquet
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
# Load it back
df = spark.read.format('parquet').load('.../temp')
df = df.where(col('sex')=='Male')
# Save it back - This produces ERROR
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
错误信息 -
执行器 22): java.io.FileNotFoundException: 请求的文件
maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet
不存在。底层文件可能已更新。
您可以通过运行“REFRESH”显式使 Spark 中的缓存失效
SQL 中的 TABLE tableName' 命令或通过重新创建数据集/数据帧
涉及。
另一个SOquestion解决这个问题。提议的解决方案是refresh该表类似于下面的代码,但这没有帮助。问题在于元数据的刷新。我不知道如何刷新它。
df.createOrReplaceTempView('table_view')
spark.catalog.refreshTable('table_view')
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
此问题的解决方法:解决这个问题的一个不优雅的方法是保存DataFrame
as parquet
具有不同名称的文件,然后删除原始文件parquet
文件,最后重命名它parquet
文件更改为旧名称。
# Workaround
import os
import shutil
# Load it back
df = spark.read.format('parquet').load('.../temp')
# Save it back as temp1, as opposed to original temp
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1')
# Delete the original parquet file
shutil.rmtree('.../temp')
# Renaming the parquet folder.
os.rename('.../temp1','.../temp')
但是,问题是某些 DataFrame 非常大,这可能不是处理它的最佳方法。更不用说重命名是否会导致元数据出现一些问题,我不确定。