使用PySpark以本地模式读取文件时出现OutOfMemoryError

2024-02-24

我有大约十几个 gpg 加密文件,其中包含我想使用 PySpark 分析的数据。我的策略是将解密函数作为平面映射应用到每个文件,然后在记录级别进行处理:

def read_fun_generator(filename):
    with gpg_open(filename[0].split(':')[-1], 'r') as f:
        for line in f:
            yield line.strip()

gpg_files = sc.wholeTextFiles(/path/to/files/*.gpg)
rdd_from_gpg = gpg_files.flatMap(read_fun_generator).map(lambda x: x.split('|'))
rdd_from_gpg.count()  # <-- For example...

当在本地模式下使用单线程时,这种方法非常有效,即将主服务器设置为local[1]。但是,使用多个线程会导致OutOfMemoryError被扔掉。我尝试过增加spark.executor.memory and spark.driver.memory to 30g,但这似乎没有帮助。我可以在用户界面中确认这些设置已卡住。 (我的机器有超过 200GB 的可用内存。)但是,我在日志中注意到块管理器似乎仅以 265.4 MB 的内存启动。我想知道这是否相关?

这是我开始的完整配置:

conf = (SparkConf()
         .setMaster("local[*]")
         .setAppName("pyspark_local")
         .set("spark.executor.memory", "30g")
         .set("spark.driver.memory", "30g")
         .set("spark.python.worker.memory", "5g")
       )
sc = SparkContext(conf=conf)

这是我的日志中的堆栈跟踪:

15/06/10 11:03:30 INFO SparkContext: Running Spark version 1.3.1
15/06/10 11:03:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/06/10 11:03:31 INFO SecurityManager: Changing view acls to: santon
15/06/10 11:03:31 INFO SecurityManager: Changing modify acls to: santon
15/06/10 11:03:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(santon); users with modify permissions: Set(santon)
15/06/10 11:03:31 INFO Slf4jLogger: Slf4jLogger started
15/06/10 11:03:31 INFO Remoting: Starting remoting
15/06/10 11:03:32 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:44347]
15/06/10 11:03:32 INFO Utils: Successfully started service 'sparkDriver' on port 44347.
15/06/10 11:03:32 INFO SparkEnv: Registering MapOutputTracker
15/06/10 11:03:32 INFO SparkEnv: Registering BlockManagerMaster
15/06/10 11:03:32 INFO DiskBlockManager: Created local directory at /tmp/spark-24dc8f0a-a89a-44f8-bb95-cd5514e5bf0c/blockmgr-85b6f082-ff5a-4a0e-b48a-1ec62715dda0
15/06/10 11:03:32 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/06/10 11:03:32 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7b2172ed-d658-4e11-bbc1-600697f3255e/httpd-5423f8bc-ec43-48c5-9367-87214dad54f4
15/06/10 11:03:32 INFO HttpServer: Starting HTTP Server
15/06/10 11:03:32 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/10 11:03:32 INFO AbstractConnector: Started [email protected] /cdn-cgi/l/email-protection:50366
15/06/10 11:03:32 INFO Utils: Successfully started service 'HTTP file server' on port 50366.
15/06/10 11:03:32 INFO SparkEnv: Registering OutputCommitCoordinator
15/06/10 11:03:32 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/10 11:03:32 INFO AbstractConnector: Started [email protected] /cdn-cgi/l/email-protection:4040
15/06/10 11:03:32 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/06/10 11:03:32 INFO SparkUI: Started SparkUI at localhost:4040
15/06/10 11:03:32 INFO Executor: Starting executor ID <driver> on host localhost
15/06/10 11:03:32 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@localhost:44347/user/HeartbeatReceiver
15/06/10 11:03:33 INFO NettyBlockTransferService: Server created on 46730
15/06/10 11:03:33 INFO BlockManagerMaster: Trying to register BlockManager
15/06/10 11:03:33 INFO BlockManagerMasterActor: Registering block manager localhost:46730 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 46730)
15/06/10 11:03:33 INFO BlockManagerMaster: Registered BlockManager
15/06/10 11:05:19 INFO MemoryStore: ensureFreeSpace(215726) called with curMem=0, maxMem=278302556
15/06/10 11:05:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 210.7 KB, free 265.2 MB)
15/06/10 11:05:19 INFO MemoryStore: ensureFreeSpace(31533) called with curMem=215726, maxMem=278302556
15/06/10 11:05:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 30.8 KB, free 265.2 MB)
15/06/10 11:05:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:46730 (size: 30.8 KB, free: 265.4 MB)
15/06/10 11:05:19 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/06/10 11:05:19 INFO SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2
15/06/10 11:05:22 INFO FileInputFormat: Total input paths to process : 16
15/06/10 11:05:22 INFO FileInputFormat: Total input paths to process : 16
15/06/10 11:05:22 INFO CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 71665121
15/06/10 11:05:22 INFO SparkContext: Starting job: count at <timed exec>:2
15/06/10 11:05:22 INFO DAGScheduler: Got job 0 (count at <timed exec>:2) with 2 output partitions (allowLocal=false)
15/06/10 11:05:22 INFO DAGScheduler: Final stage: Stage 0(count at <timed exec>:2)
15/06/10 11:05:22 INFO DAGScheduler: Parents of final stage: List()
15/06/10 11:05:22 INFO DAGScheduler: Missing parents: List()
15/06/10 11:05:22 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[1] at count at <timed exec>:2), which has no missing parents
15/06/10 11:05:23 INFO MemoryStore: ensureFreeSpace(6264) called with curMem=247259, maxMem=278302556
15/06/10 11:05:23 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.1 KB, free 265.2 MB)
15/06/10 11:05:23 INFO MemoryStore: ensureFreeSpace(4589) called with curMem=253523, maxMem=278302556
15/06/10 11:05:23 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.5 KB, free 265.2 MB)
15/06/10 11:05:23 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:46730 (size: 4.5 KB, free: 265.4 MB)
15/06/10 11:05:23 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/06/10 11:05:23 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839
15/06/10 11:05:23 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[1] at count at <timed exec>:2)
15/06/10 11:05:23 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/06/10 11:05:23 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1903 bytes)
15/06/10 11:05:23 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 3085 bytes)
15/06/10 11:05:23 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/06/10 11:05:23 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/06/10 11:05:26 INFO WholeTextFileRDD: Input split: Paths:[gpg_files]
15/06/10 11:05:40 ERROR Utils: Uncaught exception in thread stdout writer for /anaconda/python/bin/python
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
    at java.nio.CharBuffer.toString(CharBuffer.java:1201)
    at org.apache.hadoop.io.Text.decode(Text.java:405)
    at org.apache.hadoop.io.Text.decode(Text.java:382)
    at org.apache.hadoop.io.Text.toString(Text.java:280)
    at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:86)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:421)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
    at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
Exception in thread "stdout writer for /anaconda/python/bin/python" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
    at java.nio.CharBuffer.toString(CharBuffer.java:1201)
    at org.apache.hadoop.io.Text.decode(Text.java:405)
    at org.apache.hadoop.io.Text.decode(Text.java:382)
    at org.apache.hadoop.io.Text.toString(Text.java:280)
    at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:86)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:421)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
    at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
15/06/10 11:05:47 INFO PythonRDD: Times: total = 24140, boot = 2860, init = 664, finish = 20616
15/06/10 11:05:47 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1873 bytes result sent to driver
15/06/10 11:05:47 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 24251 ms on localhost (1/2)

有人遇到过这个问题吗?是否有我不知道应该修改的设置?看来这个应该是可以的...


sc.wholeTextFiles(/path/to/files/*.gpg) 的内容 - 返回 PairRDD,键 - 文件名和值 - 是文件内容。

看起来您没有使用文件内容部分,但仍然告诉 Spark 从磁盘读取文件并将其发送给工作人员。

如果您的目标是仅处理文件名列表,并使用 gpg_open 读取它们的内容,您可以这样做:

def read_fun_generator(filename):
    with gpg_open(filename.split(':')[-1], 'r') as f:
        for line in f:
            yield line.strip()

gpg_filelist = glob.glob("/path/to/files/*.gpg")
# generate RDD with file name per record
gpg_files = sc.parallelize(gpg_filelist)

rdd_from_gpg = gpg_files.flatMap(read_fun_generator).map(lambda x: x.split('|'))
rdd_from_gpg.count()  # <-- For example...

这将减少 Spark 的 JVM 使用的内存量。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用PySpark以本地模式读取文件时出现OutOfMemoryError 的相关文章

随机推荐

  • 使用 JS 或 jQuery 处理表单提交时的 500 错误?

    我正在使用标准表单 操作来发布到宁静的网络服务 由于表单的大小和构成 我尝试不使用ajax 有没有办法将错误处理附加到表单提交中 请参阅下面我当前的代码
  • WCF + Windows Phone 7

    是否可以使用 Windows Phone 7 应用程序的 WCF 服务 任何链接都会非常有帮助 Thanks 以下是将 WP7 应用程序连接到 WCF 服务的快速演练 向右滚动到他的第一个教程 AfricanGeek Silverlight
  • 调用 XSL 模板时的可选参数

    有没有办法使用可选参数调用 XSL 模板 例如
  • 撤消“设置为起始页”

    我将 ASP net 页面之一设置为 Visual Studio 中的默认起始页 当我尝试调试我的项目时 这会导致 404 错误 我该如何清除这个 Thanks Barry 右键单击 MVC 项目并选择属性 转到网络选项卡 在 开始操作 下
  • TFS 中的孤立分支

    我们在 TFS 中有一个主干 每个人都在工作 直到我们需要分支为止 我们的上一个项目是一个需要分支的大型功能 现在开发已经完成 更改已合并回主干 开发分支应该发生什么 我应该删除它吗 以某种方式将其标记为只读 隐形和锁定怎么样 You ca
  • 如何从两个列表中删除与单独列表的重复值相对应的非最大值索引?

    我有两个列表 第一个列表代表观察时间 第二个列表代表这些时间的观察值 我试图在给定不同长度的滚动窗口的情况下找到最大观测值和相应的时间 例如 这是两个列表 observed values linspeed 280 0 275 0 300 0
  • 是否有任何理由从资源中预加载可绘制对象?

    Android 是否维护应用程序可绘制资源的内存缓存并重用它们 或者预加载可能动态分配给不同小部件的所有可绘制资源是一个好习惯吗 例如 public static final int SETS R drawable set0 R drawa
  • 为什么 RelayCommand 中使用弱引用?

    我最近从 MVVMLight 3 升级到 4 并注意到我的命令损坏了 事实证明 在新的 RelayCommand 在版本 3 5 中实现 中使用弱引用导致我正在使用的代码构造失败 我知道存在一些与内存泄漏有关的弱引用的争论 我只是不明白 这
  • 如何将枚举值添加到列表中

    我有以下枚举 public enum SymbolWejsciowy K1 K2 K3 K4 K5 K6 K7 K8 我想使用此枚举的值创建一个列表 public List
  • 如何在Github上显示Markdown文件中的图像?

    我想在 Github 上的 Markdown 文件中显示一些图像 我发现它是这样工作的 Figure 1 1 https raw github com username repo master images figure 1 1 png F
  • 如何使用 underscorejs 进行分组并获取平均值

    如何分组category并使用下划线获得平均值 我有一系列对象 它应该按以下方式分组category和平均值Analytics计算自val属性 即 1 2 gt 3 3 类别总数 所以 3 2 gt 1 5 预期输出 Analytics 1
  • 如何检查 vDSP 函数在 neon 上运行的是标量还是 SIMD

    我目前正在使用 vDSP 框架中的一些函数 尤其是 vDSP conv 我想知道是否有任何方法可以检查该函数是否调用标量模式或在 neon 处理器上处理 SIMD The 文档 https developer apple com libra
  • iOS 5:设置输入类型的最小值和最大值=“日期”

    我想弄清楚如何设置一个的最小值和最大值input type date 我在网上找不到任何东西 也没有自己弄清楚 我需要设置最小值和最大值来验证年龄 我想这可能是这样的
  • 为什么没有对未使用的 let 绑定发出警告?

    C 对作为编译时常量的未使用变量发出警告 static void Main string args var unused hey CS0219 The variable unused is assigned but its value is
  • 如何在 IBM System i Access for Windows GUI Tool 中调用存储过程

    我想测试在 AS400 系统上运行的 DB2 存储过程 我安装了 IBM System i Access for Windows 并且可以针对 DB2 数据库运行 SQL 命令 我的问题是 执行接受参数并返回结果作为输出参数并将值打印到屏幕
  • 在 Node.js 中通过 ejs 使用 AJAX

    我想弄清楚如何在node js中使用ajax 我现在有这个 我如何在我的内部显示例如 order 0 name 和 order 1 name div id champ 当我按下名为 Press 的按钮时 app js var express
  • Internet Explorer 中的 RGBa

    我知道IE不支持RGBa 我还知道您可以使用以下方法 For IE 5 5 7 filter progid DXImageTransform Microsoft gradient startColorstr 99000000 endColo
  • 检查输入字段是否在普通 JavaScript 中具有焦点

    使用 jQuery 我可以测试输入字段是否具有焦点 如下所示 if is focus 不使用 jQuery 如何做到这一点 这个问题在这里得到了回答 Javascript 检测输入是否获得焦点 https stackoverflow com
  • tomcat无法建立ssl连接

    我无法与 tomcat 建立 ssl 连接 Chrome 写道 107 net ERR SSL PROTOCOL ERROR 我已经通过 keytool 生成了 mystore 文件 gt keytool genkey alias tomc
  • 使用PySpark以本地模式读取文件时出现OutOfMemoryError

    我有大约十几个 gpg 加密文件 其中包含我想使用 PySpark 分析的数据 我的策略是将解密函数作为平面映射应用到每个文件 然后在记录级别进行处理 def read fun generator filename with gpg ope