如何解决使用 Spark 从 S3 重新分区大量数据时从内存中逐出缓存的表分区元数据的问题?

2024-05-20

在尝试从 S3 重新分区数据帧时,我收到一个一般错误:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 1.0 failed 4 times, most recent failure: Lost task 33.4 in stage 1.0 (TID 88, 172.44.16.141, executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: worker lost

当我检查驱动程序日志时,我在警告后看到相同的一般错误:

20/07/22 15:47:21 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.

我无法理解为什么我会面临这个警告,即使我已经调整了 Spark理性地 https://stackoverflow.com/a/37871195/10834788.

我读到了这个警告here https://stackoverflow.com/a/48202689/10834788.

我的 Spark 配置是:

 Workers (8)
Worker Id   Address State   Cores   Memory
worker id   add     ALIVE   2 (2 Used)  502.1 GB (27.0 GB Used)
worker id   add     ALIVE   2 (2 Used)  61.8 GB (27.0 GB Used)
worker id   add     ALIVE   2 (2 Used)  61.8 GB (27.0 GB Used)
worker id   add     ALIVE   2 (2 Used)  61.8 GB (27.0 GB Used)

Running Applications (1)
Application ID  Name    Cores   Memory/Executor Submitted Time  User    State   Duration
app-id          app-name 8          27.0 GB     t1              default RUNNING 22 min

行数:9367548942

数据分区为:3046

Code:

!pip install pyspark==2.4.3

import datetime
import os
import pyspark
import pandas
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
import socket

os.environ['PYSPARK_PYTHON'] = '/opt/app-root/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/app-root/bin/python3'
# spark.jars.ivy={os.environ['HOME']}
SPARK_CLUSTER = 'spark://'
S3_ENDPOINT = ''
SOURCE_BUCKET = ''
SOURCE_BUCKET_AWS_ACCESS_KEY_ID = ""
SOURCE_BUCKET_AWS_SECRET_ACCESS_KEY = ""
SPARK_APP_NAME = f'repartition - {datetime.datetime.now().strftime("%Y-%m-%d %H:%M")}'
HOSTNAME = socket.gethostbyname(socket.gethostname())
print('Spark Cluster: {}'.format(SPARK_CLUSTER))
print('S3 endpoint: {}'.format(S3_ENDPOINT))
print('Source Bukcet: {}'.format(SOURCE_BUCKET))
print('Spark App Name: {}'.format(SPARK_APP_NAME))
print('Hostname: {}'.format(HOSTNAME))

os.environ['AWS_ACCESS_KEY_ID'] = SOURCE_BUCKET_AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = SOURCE_BUCKET_AWS_SECRET_ACCESS_KEY

def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
    print('Spark cluster is: {}'.format(spark_cluster))
    sc_conf = (
        pyspark.SparkConf().setMaster(spark_cluster) \
        .set('spark.driver.host', HOSTNAME) \
        .set('spark.driver.port', 42000) \
        .set('spark.driver.bindAddress', '0.0.0.0') \
        .set('spark.driver.blockManager.port', 42100) \
        .set('spark.executor.memory', '27g') \
        .set('spark.executor.cores', '2') \
        .set('spark.sql.parquet.enableVectorizedReader', True)
#         .set('spark.sql.autoBroadcastJoinThreshold', '524288000')
    )
    return sc_conf

def setup_spark():
    spark_config = create_spark_config(SPARK_CLUSTER)
    print('spark_config is: {}'.format(spark_config))
    print("Creating Spark Session at cluster: {}".format(SPARK_CLUSTER))
    spark = SparkSession.builder.appName(SPARK_APP_NAME).enableHiveSupport().config(conf=spark_config).getOrCreate()
    hadoopConf = spark.sparkContext._jsc.hadoopConfiguration()
    hadoopConf.set('fs.s3a.endpoint', S3_ENDPOINT)
    hadoopConf.set('fs.s3a.path.style.access', 'true')
    hadoopConf.set('fs.s3a.access.key', os.environ.get('AWS_ACCESS_KEY_ID'))
    hadoopConf.set('fs.s3a.secret.key', os.environ.get('AWS_SECRET_ACCESS_KEY'))
    hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
    print("hadoop is configured!")
    return spark

try:
    spark.stop()
    spark = setup_spark()
except:
    spark = setup_spark()


df = spark.read.parquet(f'{src_path}')

df.count()

df.rdd.getNumPartitions()

df.printSchema()


df.repartition("created_year", "created_month", "created_day").write.partitionBy("created_year", "created_month", "created_day").parquet(dest_path)

完整的堆栈跟踪是:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-19-61d00a6c140f> in <module>
----> 1 df.repartition("created_year", "created_month", "created_day").write.partitionBy("created_year", "created_month", "created_day").parquet(dest_path)

/opt/app-root/lib/python3.6/site-packages/pyspark/sql/readwriter.py in parquet(self, path, mode, partitionBy, compression)
    837             self.partitionBy(partitionBy)
    838         self._set_opts(compression=compression)
--> 839         self._jwrite.parquet(path)
    840 
    841     @since(1.6)

/opt/app-root/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/app-root/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/app-root/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o147.parquet.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 34 in stage 5.0 failed 4 times, most recent failure: Lost task 34.10 in stage 5.0 (TID 3562, 172.44.32.75, executor 41): ExecutorLostFailure (executor 41 exited caused by one of the running tasks) Reason: worker lost
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    ... 33 more

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何解决使用 Spark 从 S3 重新分区大量数据时从内存中逐出缓存的表分区元数据的问题? 的相关文章

  • Matplotlib imshow:如何在矩阵上应用蒙版

    我正在尝试以图形方式分析二维数据 matplotlib imshow在这方面非常有用 但我觉得如果我可以从矩阵中排除一些单元格 超出感兴趣范围的值 我可以更多地利用它 我的问题是这些值使我感兴趣的范围内的色彩图 变平 排除这些值后 我可以获
  • 会话cookie太大烧瓶应用程序[重复]

    这个问题在这里已经有答案了 我正在尝试使用会话 本地 加载某些数据 并且它已经工作了一段时间 但是现在我收到以下警告 并且不再加载通过会话加载的数据 b session cookie 太大 该值是 13083 字节 但是 标头需要 44 个
  • 合并一个对(元组)列表?

    从链接对的列表中 我想将这些对组合成公共 ID 组 这样我就可以将 group ids 写回数据库 例如 UPDATE table SET group n WHERE id IN Example 1 2 3 4 1 5 6 3 7 8 be
  • time.sleep - TypeError:需要一个浮点[关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 time sleep 2 TypeError a float is required 我该如何解决 我不确定我应该在这里做什么 您
  • Python 2.7从非默认目录打开多个文件(对于opencv)

    我在 64 位 win7 上使用 python 2 7 并拥有 opencv 2 4 x 当我写 cv2 imread pic 时 它会在我的默认 python 路径中打开 pic 即C Users Myname 但是我如何设法浏览不同的目
  • 计算两个节点之间的最长路径 NetworkX

    我正在尝试使用 Networkx 制作甘特图 网络中的所有节点都是完成项目所需执行的 任务 使用 Networkx 可以轻松计算项目的总时间 但是制作甘特图我需要每个节点的最新启动 NetworkX 包含一个函数 dag longest p
  • 如何在 matplotlib 图中禁用 xkcd?

    您可以通过以下方式打开 xkcd 风格 import matplotlib pyplot as plt plt xkcd 但如何禁用它呢 I try self fig clf 但这行不通 简而言之 要么使用 Valentin 提到的上下文管
  • 将多个 csv 文件连接成具有相同标头的单个 csv

    我目前正在使用以下代码导入 6 000 个 csv 文件 带标题 并将它们导出到单个 csv 文件 带单个标题行 import csv files from folder path r data US market merged data
  • Python 结构的 PHP 替代品

    我很高兴在我的 Python 项目中使用 Fabric 进行部署 现在我正在从事一个更大的 PHP 项目 想知道是否有类似 PHP 的 Fabric 之类的东西 唔 为什么这有关系 Fabric 只是 python 脚本 所以它与项目语言无
  • 调度算法,找到设定长度的所有非重叠区间

    我需要为我的管理应用程序实现一种算法 该算法将告诉我何时可以将任务分配给哪个用户 我实现了一个蛮力解决方案 它似乎有效 但我想知道是否有更有效的方法来做到这一点 为了简单起见 我重写了算法以对数字列表进行操作 而不是数据库查询等 下面我将尝
  • python 从字典中获取唯一值

    我想从我的字典中获取唯一的值 Input 320 167 316 0 319 167 401 167 319 168 380 167 265 166 期望的输出 167 0 168 166 我的代码 unique values sorted
  • Keras CNN 回归模型损失低,准确度为 0

    我在 keras 中遇到这个 NN 回归模型的问题 我正在研究一个汽车数据集 以根据 13 个维度预测价格 简而言之 我已将其读取为 pandas 数据帧 将数值转换为浮点数 缩放值 然后对分类值使用 one hot 编码 这创建了很多新列
  • Python 正则表达式中的 \B+ 与 [\B]+ 与 [^\b]+

    我在回答 SO 问题时遇到了一个我不明白的问题 我创建了一个简化的示例来说明该问题 场景 我正在测试两个标记 不是随机的英语单词 在字符串中至少相距一定距离 在这个例子中 我们有一个动物列表 我们要确保在羊和狼之间至少还有其他三种动物 否则
  • Python - 根据条件调用函数

    我想知道是否有一种简洁的方法来根据条件调用函数 我有这个 if list 1 some dataframe df myfunction 我想知道这是否有可能三元运算符 http book pythontips com en latest t
  • 当输入是 DataFrame 时,在seaborn中对箱线图进行分组

    我打算在一个图中绘制多个列pandas dataframe 全部按另一列分组 使用groupby inside seaborn boxplot 对于类似的问题 这里有一个很好的答案matplotlib matplotlib 分组箱线图 ht
  • 如何在 Windows 10 上将 ipynb 文件与 Jupyter Lab(来自 Anaconda)关联

    我使用 Windows 10 Jupiter Lab 是从 Anaconda 安装的 我想交往ipynb使用 Jupyter Lab 保存文件 这样 当我双击ipynb文件应使用 Jupyter Lab 打开 我该怎么做 Install n
  • 忽略稀疏矩阵中的重复条目

    我尝试过初始化csc matrix and csr matrix从列表中 data rows cols 值如文档所示 sparse csc matrix data rows cols shape n n 问题是 我实际上拥有的生成方法dat
  • Pyspark:相当于 np.where [重复]

    这个问题在这里已经有答案了 这个操作在 Pyspark 中相当于什么 import pandas as pd import numpy as np df pd DataFrame Type list ABBC Set list ZZXY d
  • 在绘图中的线间隙之间添加注释

    I have a graph like this 而不是在上面的日子symbol 我想知道是否有办法可以在行之间添加此注释 从一个点到另一个点 如果以防万一 这可能是重复的 我深表歉意 This is my expected output
  • 我可以在某些网格中打印带有颜色的 pandas 数据框吗?

    我有一个 pandas DataFrame 我想突出显示一些数据 例如 In 1 import pandas as pd In 2 import numpy as np In 3 df pd DataFrame np reshape ran

随机推荐