Spark 可扩展性:我做错了什么?

2023-11-21

我正在使用 Spark 处理数据,它可以处理一天的数据(40G),但失败了OOM一周的数据:

import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
          .map(lambda row:(row.id, row.foo))
          for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
  .reduceByKey(operator.add).saveAsTextFile("myoutput")

不同ID的数量小于10k。 每个ID都是一个很小的int。 由于太多执行程序因 OOM 失败而导致作业失败。 当工作成功时(少量投入),"myoutput"大约是100k。

  1. 我究竟做错了什么?
  2. 我尝试更换saveAsTextFile with collect(因为我实际上想在保存之前在 python 中进行一些切片和切块),行为没有差异,同样的失败。这是可以预料的吗?
  3. 我曾经有reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])代替sc.union- 哪个更好?有什么区别吗?

该集群有25节点与825GB内存和224其中的核心。

调用是spark-submit --master yarn --num-executors 50 --executor-memory 5G.

单个 RDD 有约 140 列,涵盖一小时的数据,因此一周是 168(=7*24) 个 RDD 的并集。


Spark 在扩展时经常会出现内存不足错误。在这些情况下,程序员应该进行微调。或者重新检查您的代码,以确保您没有做任何过多的事情,例如收集所有bigdata在驱动程序中,这很可能超过内存开销限制,无论你设置多大。

要了解正在发生的事情,您应该意识到何时yarn决定杀死超出内存限制的容器。当容器超出限制时就会发生这种情况内存开销 limit.

在调度程序中,您可以检查事件时间线以查看容器发生了什么。如果 Yarn 杀死了一个容器,它将显示为红色,当您将鼠标悬停/单击它时,您将看到如下消息:

容器因超出内存限制而被 YARN 终止。已使用 16 GB 物理内存中的 16.9 GB。考虑提高spark.yarn.executor.memoryOverhead。

enter image description here


因此,在这种情况下,您要关注的是这些配置属性(值是示例my簇):

# More executor memory overhead
spark.yarn.executor.memoryOverhead          4096

# More driver memory overhead
spark.yarn.driver.memoryOverhead            8192

# Max on my nodes
#spark.executor.cores                        8
#spark.executor.memory                       12G

# For the executors
spark.executor.cores                        6
spark.executor.memory                       8G

# For the driver
spark.driver.cores                          6
spark.driver.memory                         8G

首先要做的就是增加memoryOverhead.

在驱动程序中还是在执行程序中?

当您从 UI 概览集群时,您可以单击尝试 ID 并检查诊断信息其中应该提到被杀死的容器的 ID。如果和你的一样增材制造集装箱,那么它是驱动程序,否则是执行程序。


这并没有解决问题,现在怎么办?

您必须微调所提供的核心数量和堆内存。你看pyspark将在堆外内存中完成大部分工作,因此您不想为堆提供太多空间,因为这会被浪费。你不想给予太少,因为垃圾收集器会出现问题。回想一下,这些是 JVM。

如上所述here,一个工作线程可以托管多个执行程序,因此使用的核心数量会影响每个执行程序拥有的内存量,因此减少 #cores 可能会有所帮助。

我把它写在Spark中的内存开销问题和火花——容器以非零退出代码 143 退出更详细的是,大部分我不会忘记!我还没有尝试过的另一种选择是Spark.默认并行度 or/and spark.storage.memoryFraction,根据我的经验,这没有帮助。


您可以按照 sds 提到的方式传递配置标志,或者像这样:

spark-submit --properties-file my_properties

其中“my_properties”类似于我上面列出的属性。

对于非数值,您可以这样做:

spark-submit --conf spark.executor.memory='4G' 
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 可扩展性:我做错了什么? 的相关文章

随机推荐

  • Chrome 扩展程序弹出窗口中单击时的 JavaScript 警报立即消失

    我正在开发一个 Chrome 扩展程序 并希望在用户单击某些元素时使用 Prompt 获取用户的输入 不幸的是 由于某种原因 当作为 onclick 或在 jQuery something click function 中调用时 我无法让p
  • 使用 JavaScript 使用 HmacSHA256 正确签署字符串

    在用于身份验证的 Houndify API 文档中 您有以下内容块 验证请求的示例 假设我们有以下信息 UserID ae06fcd3 6447 4356 afaa 813aa4f2ba41 RequestID 70aa7c25 c74f
  • 使用 DLR 运行使用 CompileAssemblyFromSource 生成的代码?

    对此进行后续跟进很好的答案 我想知道 DLR 是否使用dynamic关键字可以允许以不太冗长的方式为生成的程序集编写代码 例如 上述答案的代码可以 using Microsoft CSharp CSharpCodeProvider foo
  • Pytorch - 在 softmax 层之后选择最佳概率

    我有一个使用 Pytorch 0 4 0 的逻辑回归模型 其中我的输入是高维的 我的输出必须是标量 0 1 or 2 我使用线性层与 softmax 层相结合来返回n x 3张量 其中每列表示输入属于三个类别之一的概率 0 1 or 2 但
  • 运行基于 OpenMPI 的库时出错

    我已经从 Ubuntu 中可用的标准 apt get install 安装了 openmpi 库 我运行一个调用 MPI 库的 python 代码 我收到以下错误 任何想法错误的根源是什么 是 OpenMPI 配置错误吗 如何解决这个问题
  • 如何查看WTForms验证错误?

    我正在编写一些基本测试 但测试失败了 def test new user registration self self client get user register form RegistrationForm email u email
  • fopen:无法打开流:Mac 上的 PHP 中的权限被拒绝 [重复]

    这个问题在这里已经有答案了 我写了这段代码 if file exists testfile rtf echo file exists else echo file doesn t exist fh fopen testfile rtf w
  • 如何在 Swift 中写入 Google Sheets

    我正在尝试通过以下方式写入 Google Sheets 文档这个谷歌API 但我没有取得太大成功 我什至不确定我是否遵循了正确的 api 因为它显示 Appscript 并且看起来像 Swift 我已经能够使用以下方法从工作表中读取数据 p
  • 当工作线程工作时,UI 变得不稳定

    我有一个手写识别应用程序 用户用手指绘画 该应用程序识别字符 识别引擎在具有尽可能低优先级的工作线程中运行 Thread MIN PRIORITY 它是纯粹的CPU 内存算法 没有任何I O 不过 当线程正在积极工作时 用户界面会变得相当不
  • 人类可读的 type_info.name() [重复]

    这个问题在这里已经有答案了 我编译了以下代码g 并得到输出 该输出写在注释中 template
  • Laravel 4 - 没有可用的猜测者问题

    我收到此错误 LogicException 无法猜测 mime 类型 因为没有可用的猜测器 您启用了 php fileinfo 扩展吗 我已经启用了 php fileinfo 扩展并重新启动了 Wamp Web 服务器 但我仍然无法解决这个
  • AsyncTask Android 示例

    我正在读关于AsyncTask 我尝试了下面的简单程序 但这似乎不起作用 我怎样才能让它发挥作用 public class AsyncTaskActivity extends Activity Button btn Called when
  • Firebase Listener 在空闲时间后无法识别或恢复连接

    我的项目是识别客户端是在线还是离线 我使用 Android Firebase 文档中提供的代码 该代码使用 info connected 中的 EventListener since I can connect from multiple
  • 从字符串源列表动态生成最短的正则表达式

    我有一堆 SKU 库存单位 它们代表一系列字符串 我想创建一个正则表达式来匹配它们 举例来说 如果我有 SKU var skus new BATPAG003 BATTWLP03 BATTWLP04 BATTWSP04 SPIFATB01 我
  • 在 Swift 3 中访问代码错误

    Xcode 8 beta 4 中的新功能 NSError桥接到 SwiftError协议类型 这会在处理失败时影响 StoreKitSKPaymentTransactions 您应该检查以确保没有因为事务被取消而发生错误 以了解是否向用户显
  • Spring REST - 将 GET 参数绑定到嵌套对象

    我知道你可以将 get 请求参数绑定到 pojo 例如 RequestMapping value reservation method RequestMethod GET produces MediaType APPLICATION JSO
  • 在字符串列表中的特定元素之前和之后插入元素

    当出现特定字符串时是否可以插入到列表中 例子 List north south east west south united 因此 每次出现字符串 south 时 列表都会插入一个项目 canada 在列表中元素 south 之前 Resu
  • [NodeJs][Sequelize] ReferenceError:初始化前无法访问“ModelName”

    目前我使用一个API实现节点 Js 13和 ORM续集 v5而这一切都在ES6 通过 package json 中的 type module 在这个项目中 当我尝试使用关联时出现问题 我有三个关联的模型 author js authorbo
  • 我如何推迟 jQuery Each 循环

    我在 jQuery 每个循环中执行 繁重 画布操作 导致较慢的设备 IE 和 iPad 有时变得完全无响应 所以我想我可以使用下划线 defer 将每个循环中的函数排队 例如 function handleAsset defer funct
  • Spark 可扩展性:我做错了什么?

    我正在使用 Spark 处理数据 它可以处理一天的数据 40G 但失败了OOM一周的数据 import pyspark import datetime import operator sc pyspark SparkContext sqc