将动态加载的函数提交到 ProcessPoolExecutor

2024-03-14

我想提交一个动态加载的函数到concurrent.futures.ProcessPoolExecutor。这是一个例子。有module.py其中包含该功能。

# Content of module.py

def func():
    return 1

然后,剩下的就在file.py

# Content of file.py

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import importlib
from pathlib import Path
import inspect


def load_function_from_module(path):
    spec = importlib.util.spec_from_file_location(path.stem, str(path))
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)

    return mod


def func_top_level():
    return 2


if __name__ == '__main__':
    # Dynamically load function from other module.
    path = Path(__file__).parent / "module.py"
    func = dict(inspect.getmembers(load_function_from_module(path)))["func"]

    with ProcessPoolExecutor(2) as executor:
        future = executor.submit(func)
        future_ = executor.submit(func_top_level)

    # Here comes the exception.
    print(future.result())

回溯是

Traceback (most recent call last):
_pickle.PicklingError: Can't pickle <function func at 0x7f5a548eb050>: it's not the same object as module.func

解决方案 1:包裹func具有顶级功能

Place def myfunc(): return func()函数加载并提交后myfunc.

这适用于本示例,但是一旦您移动整个if __name__ ...块成自己的main()功能,myfunc()再次变为本地并且 hack 不起作用。由于问题发生在我的应用程序深处,因此这是不可能的。

尝试 2:更换pickle with cloudpickle

我个人最喜欢的解决方案是改变方式ProcessPoolExecutor序列化对象。例如,cloudpickle可以序列化func.

虽然,这answer https://stackoverflow.com/a/45316233/7523785建议可以注册自定义减速器,以下 PR 和问题表明该功能不起作用或者我只是无法替换pickle with cloudpickle.

  • https://bugs.python.org/issue28053 https://bugs.python.org/issue28053
  • https://github.com/python/cpython/pull/9959 https://github.com/python/cpython/pull/9959
  • https://github.com/python/cpython/pull/15058 https://github.com/python/cpython/pull/15058

非常感谢您的帮助。


我使用一个简单的包装类来修补cloudpickle into ProcessPoolExecutor,像这样:

from concurrent.futures import ProcessPoolExecutor
import cloudpickle

def apply_cloudpickle(fn, /, *args, **kwargs):
    fn = cloudpickle.loads(fn)
    return fn(*args, **kwargs)

class CloudpickleProcessPoolExecutor(ProcessPoolExecutor):
    def submit(self, fn, /, *args, **kwargs):
        return super().submit(apply_cloudpickle, cloudpickle.dumps(fn), *args, **kwargs)

然后当你打电话时.map和其他方法,cloudpickle将在引擎盖下酸洗函数对象。 (请注意,参数仍然会贯穿普通的pickle管道。)

(同样的策略适用于dill etc.)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将动态加载的函数提交到 ProcessPoolExecutor 的相关文章

  • Django 管理员在模型编辑时间歇性返回 404

    我们使用 Django Admin 来维护导出到我们的一些站点的一些数据 有时 当单击标准更改列表视图来获取模型编辑表单而不是路由到正确的页面时 我们会得到 Django 404 页面 模板 它是偶尔发生的 我们可以通过重新加载三次来重现它
  • 将数据从 python pandas 数据框导出或写入 MS Access 表

    我正在尝试将数据从 python pandas 数据框导出到现有的 MS Access 表 我想用已更新的数据替换 MS Access 表 在 python 中 我尝试使用 pandas to sql 但收到错误消息 我觉得很奇怪 使用 p
  • 为 Anaconda Python 安装 psycopg2

    我有 Anaconda Python 3 4 但是每当我运行旧代码时 我都会通过输入 source activate python2 切换到 Anaconda Python 2 7 我的问题是我为 Anaconda Python 3 4 安
  • Python(Selenium):如何通过登录重定向/组织登录登录网站

    我不是专业程序员 所以请原谅任何愚蠢的错误 我正在做一些研究 我正在尝试使用 Selenium 登录数据库来搜索大约 1000 个术语 我有两个问题 1 重定向到组织登录页面后如何使用 Selenium 登录 2 如何检索数据库 在我解决
  • 通过最小元素比较对 5 个元素进行排序

    我必须在 python 中使用元素之间的最小比较次数来建模对 5 个元素的列表进行排序的执行计划 除此之外 复杂性是无关紧要的 结果是一个对的列表 表示在另一时间对列表进行排序所需的比较 我知道有一种算法可以通过 7 次比较 总是在元素之间
  • 从字符串中删除识别的日期

    作为输入 我有几个包含不同格式日期的字符串 例如 彼得在16 45 我的生日是1990年7月8日 On 7 月 11 日星期六我会回家 I use dateutil parser parse识别字符串中的日期 在下一步中 我想从字符串中删除
  • 如何使用 Ansible playbook 中的 service_facts 模块检查服务是否存在且未安装在服务器中?

    我用过service facts检查服务是否正在运行并启用 在某些服务器中 未安装特定的软件包 现在 我如何知道这个特定的软件包没有安装在该特定的服务器上service facts module 在 Ansible 剧本中 它显示以下错误
  • 根据列值突出显示数据框中的行?

    假设我有这样的数据框 col1 col2 col3 col4 0 A A 1 pass 2 1 A A 2 pass 4 2 A A 1 fail 4 3 A A 1 fail 5 4 A A 1 pass 3 5 A A 2 fail 2
  • OpenCV 无法从 MacBook Pro iSight 捕获

    几天后 我无法再从 opencv 应用程序内部打开我的 iSight 相机 cap cv2 VideoCapture 0 返回 并且cap isOpened 回报true 然而 cap grab 刚刚返回false 有任何想法吗 示例代码
  • 如何使用 OpencV 从 Firebase 读取图像?

    有没有使用 OpenCV 从 Firebase 读取图像的想法 或者我必须先下载图片 然后从本地文件夹执行 cv imread 功能 有什么办法我可以使用cv imread link of picture from firebase 您可以
  • AWS EMR Spark Python 日志记录

    我正在 AWS EMR 上运行一个非常简单的 Spark 作业 但似乎无法从我的脚本中获取任何日志输出 我尝试过打印到 stderr from pyspark import SparkContext import sys if name m
  • BeautifulSoup 中的嵌套标签 - Python

    我在网站和 stackoverflow 上查看了许多示例 但找不到解决我的问题的通用解决方案 我正在处理一个非常混乱的网站 我想抓取一些数据 标记看起来像这样 table tbody tr tr tr td td td table tr t
  • 添加不同形状的 numpy 数组

    我想添加两个不同形状的 numpy 数组 但不进行广播 而是将 缺失 值视为零 可能最简单的例子是 1 2 3 2 gt 3 2 3 or 1 2 3 2 1 gt 3 2 3 1 0 0 我事先不知道形状 我正在弄乱每个 np shape
  • 如何在ipywidget按钮中显示全文?

    我正在创建一个ipywidget带有一些文本的按钮 但按钮中未显示全文 我使用的代码如下 import ipywidgets as widgets from IPython display import display button wid
  • Python 的“zip”内置函数的 Ruby 等价物是什么?

    Ruby 是否有与 Python 内置函数等效的东西zip功能 如果不是 做同样事情的简洁方法是什么 一些背景信息 当我试图找到一种干净的方法来进行涉及两个数组的检查时 出现了这个问题 如果我有zip 我可以写这样的东西 zip a b a
  • Pandas:merge_asof() 对多行求和/不重复

    我正在处理两个数据集 每个数据集具有不同的关联日期 我想合并它们 但因为日期不完全匹配 我相信merge asof 是最好的方法 然而 有两件事发生merge asof 不理想的 数字重复 数字丢失 以下代码是一个示例 df a pd Da
  • Fabric env.roledefs 未按预期运行

    On the 面料网站 http docs fabfile org en 1 10 usage execution html 给出这个例子 from fabric api import env env roledefs web hosts
  • 如何在 Python 中追加到 JSON 文件?

    我有一个 JSON 文件 其中包含 67790 1 kwh 319 4 现在我创建一个字典a dict我需要将其附加到 JSON 文件中 我尝试了这段代码 with open DATA FILENAME a as f json obj js
  • Scrapy:如何使用元在方法之间传递项目

    我是 scrapy 和 python 的新手 我试图将 parse quotes 中的项目 item author 传递给下一个解析方法 parse bio 我尝试了 request meta 和 response meta 方法 如 sc
  • 在 Qt 中自动调整标签文本大小 - 奇怪的行为

    在 Qt 中 我有一个复合小部件 它由排列在 QBoxLayouts 内的多个 QLabels 组成 当小部件调整大小时 我希望标签文本缩放以填充标签区域 并且我已经在 resizeEvent 中实现了文本大小的调整 这可行 但似乎发生了某

随机推荐