如何修复 BrokenProcessPool:concurrent.futures ProcessPoolExecutor 的错误


使用并发.futures.ProcessPoolExecutor 我尝试运行第一段代码来并行执行函数“Calculate_Forex_Data_Derivatives(data,gride_spacing)”。当调用结果 executor_list[i].result() 时,我得到“BrokenProcessPool:进程池中的进程在 future 正在运行或挂起时突然终止”。我尝试运行代码将函数的多次调用发送到处理池,以及运行代码仅向处理池发送一次调用,这两种情况都会导致错误。


我正在使用 Anaconda 环境、Spyder 编辑器和 Windows。


#code that returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."

import pandas as pd
import numpy as np
from findiff import FinDiff
import multiprocessing
import concurrent.futures

def Calculate_Forex_Data_Derivatives(forex_data,dt):  #function to run in parallel
        dClose_dt = FinDiff(0,dt,1)(forex_data)[-1]
    except IndexError:
        dClose_dt = np.nan

        d2Close_dt2 = FinDiff(0,dt,2)(forex_data)[-1]
    except IndexError:
        d2Close_dt2 = np.nan

        d3Close_dt3 = FinDiff(0,dt,3)(forex_data)[-1]
    except IndexError:
        d3Close_dt3 = np.nan

    return dClose_dt, d2Close_dt2, d3Close_dt3

#input for function
#forex_data is pandas dataframe, forex_data['Close'].values is numpy array
#dt is numpy array
#input_1 and input_2 are each a list of numpy arrays

input_1 = []
input_2 = []
for forex_data_index,data_point in enumerate(forex_data['Close'].values[:1]):

def multi_processing():
    executors_list = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for index in range(len(input_1)):

    return executors_list

if __name__ == '__main__':
    print('calculating derivatives')
    executors_list = multi_processing()

for output in executors_list
    print(output.result()) #returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."


#simple example that runs fine

def function(x,y):  #function to run in parallel
    except NameError:
        a = (x*y)[0]
        b = (x+y)[0]

    return  a,b

x=[np.array([0,1,2]),np.array([3,4,5])]    #function inputs, list of numpy arrays

def multi_processing():    
    executors_list = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for index,_ in enumerate(x):

    return executors_list

if __name__ == '__main__':
    executors_list = multi_processing()

for output in executors_list:   #prints as expected
    print(output.result())      #(0, 6)
                                #(27, 12)

我知道破坏 ProcessPoolExecutor 管道的三种典型方法:


您的系统遇到限制(很可能是内存),并开始终止进程​​。由于 Windows 上的 fork 会克隆您的内存内容,因此在处理大型 DataFrame 时这并非不可能。


  • 检查任务管理器中的内存消耗。
  • 除非你的 DataFrame 占据了你一半的内存,否则它应该消失max_workers=1,但这并不是明确的。


子进程的 Python 实例由于某些未引发适当异常的错误而终止。一个例子是导入的 C 模块中的段错误。


由于您的代码在没有 PPE 的情况下可以正常运行,我能想到的唯一情况是某些模块不是多处理安全的。然后它也有机会消失max_workers=1。也可能通过在创建工作线程后立即手动调用该函数(调用 for 循环之后的行)来在主进程中引发错误executor.submit。 否则可能真的很难识别,但在我看来这是最不可能的情况。

PPE 代码中的例外情况



由于代码(希望)经过了良好的测试,因此主要嫌疑点在于返回数据。它必须被腌制并通过套接字发回 - 这两个步骤都可能崩溃。所以你必须检查:

  • 返回数据是否可以选择?
  • 腌制的对象是否足够小,可以发送(大约2GB)?


    if len(pickle.dumps((dClose_dt, d2Close_dt2, d3Close_dt3))) > 2 * 10 ** 9: 
        raise RuntimeError('return data can not be sent!')

在Python 3.7中,这个问题得到了解决,并且它发送回了异常。


