我想提交一个动态加载的函数到concurrent.futures.ProcessPoolExecutor
。这是一个例子。有module.py
其中包含该功能。
# Content of module.py
def func():
return 1
然后,剩下的就在file.py
# Content of file.py
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import importlib
from pathlib import Path
import inspect
def load_function_from_module(path):
spec = importlib.util.spec_from_file_location(path.stem, str(path))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def func_top_level():
return 2
if __name__ == '__main__':
# Dynamically load function from other module.
path = Path(__file__).parent / "module.py"
func = dict(inspect.getmembers(load_function_from_module(path)))["func"]
with ProcessPoolExecutor(2) as executor:
future = executor.submit(func)
future_ = executor.submit(func_top_level)
# Here comes the exception.
print(future.result())
回溯是
Traceback (most recent call last):
_pickle.PicklingError: Can't pickle <function func at 0x7f5a548eb050>: it's not the same object as module.func
解决方案 1:包裹func
具有顶级功能
Place def myfunc(): return func()
函数加载并提交后myfunc
.
这适用于本示例,但是一旦您移动整个if __name__ ...
块成自己的main()
功能,myfunc()
再次变为本地并且 hack 不起作用。由于问题发生在我的应用程序深处,因此这是不可能的。
尝试 2:更换pickle
with cloudpickle
我个人最喜欢的解决方案是改变方式ProcessPoolExecutor
序列化对象。例如,cloudpickle
可以序列化func
.
虽然,这answer https://stackoverflow.com/a/45316233/7523785建议可以注册自定义减速器,以下 PR 和问题表明该功能不起作用或者我只是无法替换pickle
with cloudpickle
.
- https://bugs.python.org/issue28053 https://bugs.python.org/issue28053
- https://github.com/python/cpython/pull/9959 https://github.com/python/cpython/pull/9959
- https://github.com/python/cpython/pull/15058 https://github.com/python/cpython/pull/15058
非常感谢您的帮助。