如何修复 BrokenProcessPool:concurrent.futures ProcessPoolExecutor 的错误

2024-01-02

使用并发.futures.ProcessPoolExecutor 我尝试运行第一段代码来并行执行函数“Calculate_Forex_Data_Derivatives(data,gride_spacing)”。当调用结果 executor_list[i].result() 时,我得到“BrokenProcessPool:进程池中的进程在 future 正在运行或挂起时突然终止”。我尝试运行代码将函数的多次调用发送到处理池,以及运行代码仅向处理池发送一次调用,这两种情况都会导致错误。

我还使用更简单的代码(提供了第二个代码)测试了代码的结构,该代码具有相同类型的调用函数输入,并且工作正常。我在这两段代码之间看到的唯一不同之处是第一个代码从“findiff”模块调用函数“FinDiff(axis,grid_spacing,derivative_order)”。当正常串联运行时,该函数与“Calculate_Forex_Data_Derivatives(data,grid_spacing)”一起完美地工作。

我正在使用 Anaconda 环境、Spyder 编辑器和 Windows。

任何帮助,将不胜感激。

#code that returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."

import pandas as pd
import numpy as np
from findiff import FinDiff
import multiprocessing
import concurrent.futures

def Calculate_Forex_Data_Derivatives(forex_data,dt):  #function to run in parallel
    try:
        dClose_dt = FinDiff(0,dt,1)(forex_data)[-1]
    except IndexError:
        dClose_dt = np.nan

    try:   
        d2Close_dt2 = FinDiff(0,dt,2)(forex_data)[-1]
    except IndexError:
        d2Close_dt2 = np.nan

    try:
        d3Close_dt3 = FinDiff(0,dt,3)(forex_data)[-1]
    except IndexError:
        d3Close_dt3 = np.nan

    return dClose_dt, d2Close_dt2, d3Close_dt3

#input for function
#forex_data is pandas dataframe, forex_data['Close'].values is numpy array
#dt is numpy array
#input_1 and input_2 are each a list of numpy arrays

input_1 = []
input_2 = []
for forex_data_index,data_point in enumerate(forex_data['Close'].values[:1]):
    input_1.append(forex_data['Close'].values[:forex_data_index+1])
    input_2.append(dt[:forex_data_index+1])


def multi_processing():
    executors_list = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for index in range(len(input_1)):
            executors_list.append(executor.submit(Calculate_Forex_Data_Derivatives,input_1[index],input_2[index]))

    return executors_list

if __name__ == '__main__':
    print('calculating derivatives')
    executors_list = multi_processing()

for output in executors_list
    print(output.result()) #returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."


##############################################################


#simple example that runs fine

def function(x,y):  #function to run in parallel
    try:
        asdf
    except NameError:
        a = (x*y)[0]
        b = (x+y)[0]

    return  a,b

x=[np.array([0,1,2]),np.array([3,4,5])]    #function inputs, list of numpy arrays
y=[np.array([6,7,8]),np.array([9,10,11])]

def multi_processing():    
    executors_list = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for index,_ in enumerate(x):
            executors_list.append(executor.submit(function,x[index],y[index]))

    return executors_list

if __name__ == '__main__':
    executors_list = multi_processing()

for output in executors_list:   #prints as expected
    print(output.result())      #(0, 6)
                                #(27, 12)

我知道破坏 ProcessPoolExecutor 管道的三种典型方法:

操作系统终止/终止

您的系统遇到限制(很可能是内存),并开始终止进程​​。由于 Windows 上的 fork 会克隆您的内存内容,因此在处理大型 DataFrame 时这并非不可能。

如何识别

  • 检查任务管理器中的内存消耗。
  • 除非你的 DataFrame 占据了你一半的内存,否则它应该消失max_workers=1,但这并不是明确的。

工人的自我终止

子进程的 Python 实例由于某些未引发适当异常的错误而终止。一个例子是导入的 C 模块中的段错误。

如何识别

由于您的代码在没有 PPE 的情况下可以正常运行,我能想到的唯一情况是某些模块不是多处理安全的。然后它也有机会消失max_workers=1。也可能通过在创建工作线程后立即手动调用该函数(调用 for 循环之后的行)来在主进程中引发错误executor.submit。 否则可能真的很难识别,但在我看来这是最不可能的情况。

PPE 代码中的例外情况

管道的子进程端(即处理通信的代码)可能会崩溃,从而导致适当的异常,不幸的是无法与主进程通信。

如何识别

由于代码(希望)经过了良好的测试,因此主要嫌疑点在于返回数据。它必须被腌制并通过套接字发回 - 这两个步骤都可能崩溃。所以你必须检查:

  • 返回数据是否可以选择?
  • 腌制的对象是否足够小,可以发送(大约2GB)?

因此,您可以尝试返回一些简单的虚拟数据,或者明确检查两个条件:

    if len(pickle.dumps((dClose_dt, d2Close_dt2, d3Close_dt3))) > 2 * 10 ** 9: 
        raise RuntimeError('return data can not be sent!')

在Python 3.7中,这个问题得到了解决,并且它发送回了异常。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何修复 BrokenProcessPool:concurrent.futures ProcessPoolExecutor 的错误 的相关文章

  • API 端点的 Django 子域配置

    我已经建立了一个 Django 项目 它使用django rest framework提供一些 ReST 功能 网站和其他功能都运行良好 然而有一个小问题 我需要我的 API 端点指向一个不同的子域 例如 当用户访问该网站时 他 她可以根据
  • 操作数无法与形状 (128,) (0,) 错误一起广播

    我正在尝试实现面部识别登录系统 但出现错误 操作数无法与形状 128 0 一起广播 我不知道什么或如何解决它 这是我已实现的 view py 和 FaceDetector py 以及我从服务器收到的错误 errors Traceback m
  • Spark MLlib - 训练隐式警告

    我在使用时不断看到这些警告trainImplicit WARN TaskSetManager Stage 246 contains a task of very large size 208 KB The maximum recommend
  • Python 遍历目录树的方法是什么?

    我觉得分配文件和文件夹并执行 item 部分有点黑客 有什么建议么 我正在使用Python 3 2 from os import from os path import def dir contents path contents list
  • Python 中的安全解除引用

    Groovy 有一个很好的安全取消引用运算符 这有助于避免 NullPointerExceptions variable method The method仅当以下情况时才会被调用variable is not null 有没有办法在 Py
  • 为什么在 Windows 中使用 GetConsoleScreenBufferInfoEx 时控制台窗口会缩小?

    我正在尝试使用 GetConsoleScreenBufferInfoEx 和 SetConsoleScreenBufferInfoEx 设置 Windows 命令行控制台的背景和前景色 我正在 Python 中使用 wintypes 进行此
  • conda 无法从 yml 创建环境

    我尝试运行下面的代码来从 YAML 文件创建虚拟 Python 环境 我在 Ubuntu 服务器上的命令行中运行代码 虚拟环境名为 py36 当我运行下面的代码时 我收到下面的消息 环境也没有被创建 这个问题是因为我有几个必须使用 pip
  • html 解析器 python

    我正在尝试解析一个网站 我正在使用 HTMLParser 模块 问题是我想解析第一个 a href 评论后 但我真的不知道该怎么做 所以我在文档中发现有一个函数叫做handle comment 但我还没有找到如何正确使用它 我有以下内容 i
  • 如何在 ReportLab 段落中插入回车符?

    有没有办法在 ReportLab 的段落中插入回车符 我试图将 n 连接到我的段落字符串 但这不起作用 Title Paragraph Title n Page myStyle 我想要这样做 因为我将名称放入单元格中 并且想要控制单元格中的
  • 字典中的列表,Python 中的循环

    我有以下代码 TYPES hotmail type hotmail lookup mixed dkim no signatures S Return Path email protected cdn cgi l email protecti
  • Python 正则表达式部分匹配或“hitEnd”

    我正在编写一个扫描器 因此我将任意字符串与正则表达式规则列表进行匹配 如果我可以模拟 Java hitEnd 功能 不仅知道正则表达式何时不匹配 还知道何时匹配 这将非常有用 can t匹配 当正则表达式匹配器在决定拒绝输入之前到达输入末尾
  • 在Python中创建一个新表

    我正在尝试从数控机床中提取数据 事件每毫秒发生一次 我需要过滤掉一些用管道 分隔的变量分隔符 PuTTy exe 程序生成的日志文件 我尝试阅读熊猫 但列不在同一位置 df pd read table data log sep 日志文件的一
  • 为 Networkx 图添加标题?

    我希望我的代码创建一个带有标题的图 使用下面的代码 可以创建绘图 但没有标题 有人可以告诉我我做错了什么吗 import pandas as pd import networkx as nx from networkx algorithms
  • 错误:无法访问文件“$libdir/plpython2”:没有这样的文件或目录

    我正在运行 postgresql 9 4 PostgreSQL 9 4 4 on x86 64 unknown linux gnu compiled by gcc GCC 4 1 2 20070626 Red Hat 4 1 2 14 64
  • 一行Python和SQLite代码,为什么需要加“,”? [复制]

    这个问题在这里已经有答案了 c execute INSERT INTO numbers VALUES random randint 0 100 如果我将上面的代码更改为 c execute INSERT INTO numbers VALUE
  • 杂乱的扭曲连接在不干净的时尚中消失了。没有代理。已经尝试过标题

    我正在尝试抓取这个网站 https www5 apply2jobs com jupitermed ProfExt index cfm fuseaction mExternal searchJobs https www5 apply2jobs
  • Spyder 如何在同一线程的后台运行 asyncio 事件循环(或者确实如此?)

    我已经研究 asyncio 模块 功能几天了 因为我想将它用于我的应用程序的 IO 绑定部分 并且我认为我现在对它的工作原理有一个合理的理解 或者在至少我认为我已经理解了以下内容 任一时刻 任一线程中只能运行一个异步事件循环 一旦一切都设置
  • 在 Python 的 Textmate 中突出显示尾随空格?

    我想做类似的事情this http remysharp com 2008 03 30 trailing white space in textmate Textmate 提示 这样当我在 Python 中编写代码时 尾随空白总是以某种方式突
  • 为什么用字符串和时间增量转置 DataFrame 会转换数据类型?

    这种行为对我来说似乎很奇怪 id列 字符串 在转置后转换为时间戳df如果另一列是时间增量 import pandas as pd df pd DataFrame id 00115 01222 32333 val 12 14 170 df v
  • 从 pandas 数据框中绘制堆积条形图

    我有数据框 payout df head 10 复制以下 Excel 绘图的最简单 最智能和最快的方法是什么 我尝试过不同的方法 但无法让一切都到位 Thanks 如果您只想要一个堆积条形图 那么一种方法是使用循环来绘制数据框中的每一列 并

随机推荐