使用并发.futures.ProcessPoolExecutor 我尝试运行第一段代码来并行执行函数“Calculate_Forex_Data_Derivatives(data,gride_spacing)”。当调用结果 executor_list[i].result() 时,我得到“BrokenProcessPool:进程池中的进程在 future 正在运行或挂起时突然终止”。我尝试运行代码将函数的多次调用发送到处理池,以及运行代码仅向处理池发送一次调用,这两种情况都会导致错误。
我还使用更简单的代码(提供了第二个代码)测试了代码的结构,该代码具有相同类型的调用函数输入,并且工作正常。我在这两段代码之间看到的唯一不同之处是第一个代码从“findiff”模块调用函数“FinDiff(axis,grid_spacing,derivative_order)”。当正常串联运行时,该函数与“Calculate_Forex_Data_Derivatives(data,grid_spacing)”一起完美地工作。
我正在使用 Anaconda 环境、Spyder 编辑器和 Windows。
任何帮助,将不胜感激。
#code that returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."
import pandas as pd
import numpy as np
from findiff import FinDiff
import multiprocessing
import concurrent.futures
def Calculate_Forex_Data_Derivatives(forex_data,dt): #function to run in parallel
try:
dClose_dt = FinDiff(0,dt,1)(forex_data)[-1]
except IndexError:
dClose_dt = np.nan
try:
d2Close_dt2 = FinDiff(0,dt,2)(forex_data)[-1]
except IndexError:
d2Close_dt2 = np.nan
try:
d3Close_dt3 = FinDiff(0,dt,3)(forex_data)[-1]
except IndexError:
d3Close_dt3 = np.nan
return dClose_dt, d2Close_dt2, d3Close_dt3
#input for function
#forex_data is pandas dataframe, forex_data['Close'].values is numpy array
#dt is numpy array
#input_1 and input_2 are each a list of numpy arrays
input_1 = []
input_2 = []
for forex_data_index,data_point in enumerate(forex_data['Close'].values[:1]):
input_1.append(forex_data['Close'].values[:forex_data_index+1])
input_2.append(dt[:forex_data_index+1])
def multi_processing():
executors_list = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for index in range(len(input_1)):
executors_list.append(executor.submit(Calculate_Forex_Data_Derivatives,input_1[index],input_2[index]))
return executors_list
if __name__ == '__main__':
print('calculating derivatives')
executors_list = multi_processing()
for output in executors_list
print(output.result()) #returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."
##############################################################
#simple example that runs fine
def function(x,y): #function to run in parallel
try:
asdf
except NameError:
a = (x*y)[0]
b = (x+y)[0]
return a,b
x=[np.array([0,1,2]),np.array([3,4,5])] #function inputs, list of numpy arrays
y=[np.array([6,7,8]),np.array([9,10,11])]
def multi_processing():
executors_list = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for index,_ in enumerate(x):
executors_list.append(executor.submit(function,x[index],y[index]))
return executors_list
if __name__ == '__main__':
executors_list = multi_processing()
for output in executors_list: #prints as expected
print(output.result()) #(0, 6)
#(27, 12)