多处理中的加入超时

2024-02-29

我有一个虚拟示例,我想在其中应用多重处理。考虑一个场景,其中有一串数字(我称之为帧)逐一传入。我想将其分配给当前可用的任何单个进程。所以我创建了 4 个正在运行的进程while循环,查看队列中是否有任何元素,然后对其应用函数。

问题是当我join它,它陷入任何while循环,即使我关闭了while在它之前循环。但不知怎的,它被困在里面了。

Code:

# step 1, 4 processes
import multiprocessing as mp
import os
import time

class MpListOperations:
    def __init__(self):
        self.results_queue = mp.Manager().Queue()
        self.frames_queue = mp.Manager().Queue()
        self.flag = mp.Manager().Value(typecode='b',value=True)
        self.list_nums = list(range(0,5000))


    def process_list(self):
        print(f"Process id {os.getpid()} started")
        while self.flag.value:
#             print(self.flag.value)
            if self.frames_queue.qsize():
                self.results_queue.put(self.frames_queue.get()**2)


    def create_processes(self, no_of_processes = mp.cpu_count()):
        print("Creating Processes")
        self.processes = [mp.Process(target=self.process_list) for _ in range(no_of_processes)]

    def start_processes(self):
        print(f"starting processes")
        for process in self.processes:
            process.start()

    def join_process(self):
        print("Joining Processes")
        while True:
            if not self.frames_queue.qsize():
                self.flag.value=False
                print("JOININNG HERE")
                for process in self.processes:
                    exit_code = process.join()
                    print(exit_code)
                print("BREAKING DONE")
                break

    def stream_frames(self):
        print("Streaming Frames")
        for frame in self.list_nums:
            self.frames_queue.put(frame)


if __name__=="__main__":
    start = time.time()
    mp_ops = MpListOperations()
    mp_ops.create_processes()
    mp_ops.start_processes()
    mp_ops.stream_frames()
    mp_ops.join_process()
    print(time.time()-start)

现在如果我添加一个超时参数join, even 0, i.e exit_code = process.join(0)有用。我想了解在这种情况下,如果这段代码是正确的,超时的值应该是多少?为什么它在超时的情况下工作而不是在没有超时的情况下工作?用它实现多处理的正确方法是什么?


如果您查看托管队列的文档,您将看到qsize方法仅返回近似大小。因此,当所有项目都已从帧队列中取出时,我不会使用它进行测试。大概您希望让进程运行直到处理完所有帧。我知道的最简单的方法就是把 Nsentinel放置实际帧后帧队列上的项目,其中 N 是从队列获取的进程数。哨兵项是一个特殊值,不能被误认为是实际帧,并向进程发出信号,表明没有更多项可以从队列中获取(即准文件结束项)。在这种情况下我们可以使用None作为哨兵项目。然后,每个进程继续对队列执行获取操作,直到看到哨兵项,然后终止。因此没有必要self.flag属性。

这是更新和简化的代码。我还做了一些已评论的其他细微更改:

import multiprocessing as mp
import os
import time

class MpListOperations:
    def __init__(self):
        # Only create one manager process:
        manager = mp.Manager()
        self.results_queue = manager.Queue()
        self.frames_queue = manager.Queue()
        # No need to convert range to a list:
        self.list_nums = range(0, 5000)


    def process_list(self):
        print(f"Process id {os.getpid()} started")
        while True:
            frame = self.frames_queue.get()
            if frame is None: # Sentinel?
                # Yes, we are done:
                break
            self.results_queue.put(frame ** 2)


    def create_processes(self, no_of_processes = mp.cpu_count()):
        print("Creating Processes")
        self.no_of_processes = no_of_processes
        self.processes = [mp.Process(target=self.process_list) for _ in range(no_of_processes)]

    def start_processes(self):
        print("Starting Processes")
        for process in self.processes:
            process.start()

    def join_processes(self):
        print("Joining Processes")
        for process in self.processes:
            # join returns None:
            process.join()

    def stream_frames(self):
        print("Streaming Frames")
        for frame in self.list_nums:
            self.frames_queue.put(frame)
        # Put sentinels:
        for _ in range(self.no_of_processes):
            self.frames_queue.put(None)


if __name__== "__main__":
    start = time.time()
    mp_ops = MpListOperations()
    mp_ops.create_processes()
    mp_ops.start_processes()
    mp_ops.stream_frames()
    mp_ops.join_processes()
    print(time.time()-start)

Prints:

Creating Processes
Starting Processes
Process id 28 started
Process id 29 started
Streaming Frames
Process id 33 started
Process id 31 started
Process id 38 started
Process id 44 started
Process id 42 started
Process id 45 started
Joining Processes
2.3660173416137695

Windows 的注意事项

我修改了方法start_processes临时设置属性self.processes to None:

    def start_processes(self):
        print("Starting Processes")
        processes = self.processes
        # Don't try to pickle list of processes:
        self.processes = None
        for process in processes:
            process.start()
        # Restore attribute:
        self.processes = processes

否则,在 Windows 下,我们会在尝试序列化/反序列化包含两个或多个进程的进程列表时遇到 pickle 错误multiprocessing.Process实例。错误是“TypeError:无法腌制'weakref'对象。”这可以通过以下代码进行演示,我们首先尝试pickle 1 个进程的列表,然后是2 个进程的列表:

import multiprocessing as mp
import os

class Foo:
    def __init__(self, number_of_processes):
        self.processes = [mp.Process(target=self.worker) for _ in range(number_of_processes)]
        self.start_processes()
        self.join_processes()

    def start_processes(self):
        processes = self.processes
        for process in self.processes:
            process.start()

    def join_processes(self):
        for process in self.processes:
            process.join()

    def worker(self):
        print(f"Process id {os.getpid()} started")
        print(f"Process id {os.getpid()} ended")


if __name__== "__main__":
    foo = Foo(1)
    foo = Foo(2)

Prints:

Process id 7540 started
Process id 7540 ended
Traceback (most recent call last):
  File "C:\Booboo\test\test.py", line 26, in <module>
    foo = Foo(2)
  File "C:\Booboo\test\test.py", line 7, in __init__
    self.start_processes()
  File "C:\Booboo\test\test.py", line 13, in start_processes
    process.start()
  File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
Process id 18152 started
Process id 18152 ended
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多处理中的加入超时 的相关文章

随机推荐

  • 使用 browserhistory 更改 url 反应路由但没有任何反应

    我正在尝试让反应路由器工作 这是我的代码 var hashHistory require react router dom hashHistory var BrowserRouter require react router dom Bro
  • xml 文件中的错误标记中的数据过早结束

    我制作了这个系统 但它不知道发生了什么 行 197 core php foreach this gt getAll as banner 行 191 core php xmlBanners simplexml load file PATH X
  • 使用 boost asio 创建 iostream 并指定 ip 和端口

    我有一个关于 boost asio 库的问题 我成功地尝试在客户端和服务器之间创建一个套接字 这涉及创建解析器以便指定服务器的IP和端口 服务器只需要端口 和其他对象 但是 最重要的是 有必要使用write and read some作为从
  • x86 组装pushad/popad,速度有多快?

    我只是想在 x86 汇编中制作非常快速的基于计算的程序 但我需要在调用程序之前推送累加器 计数器和数据寄存器 手动推送它们更快 push eax push ecx push edx 或者只是使用 pushad 和 pop 一样 谢谢 如果你
  • 我可以在iOS7中更改datePicker的字体颜色吗?

    刚刚下载了我的 xcode 5 副本 想知道是否有人知道如何更改日期选择器中字体的颜色或大小 我的应用程序需要类似的东西 但最终还是走了很长一段路 遗憾的是 没有更简单的方法可以简单地切换到 UIDatePicker 的白色文本版本 下面的
  • 主机卡模拟 Android 4.4

    既然 Android 在带有 Android 4 4 的 Nexus 5 上支持卡模拟 是否可能会出现一些应用程序 允许我有效地复制智能卡 例如 在办公室开门的智能卡 并在其位置使用我的手机 会非常方便 Cheers 不 这不太可能 通常
  • 远程进程执行

    我的场景是使用 C Net 连接到远程计算机 并列出该远程计算机的所有进程 我可以终止一个进程 或者远程启动一个新进程 问题是 当我在远程执行新进程时 我可以在任务管理器上看到该进程 但它不会出现在 Windows 屏幕上 知道为什么它没有
  • 发布后运行 exe 时出错:“此程序可能未正确安装”

    为什么在我发布项目后 我无法在关闭项目后仅运行我的 exe 您在 调试 文件夹中找到的那个 而不收到此错误消息 因此不使用我发布的项目 仍然只使用 exe 我创建了一个测试项目只是为了测试是否是这种情况 并且每次都会发生这种情况 创建一个新
  • 如何在Elasticsearch中查询IP范围?

    我想在ELK中查询IP范围 172 16 0 0到172 31 0 0 我尝试了两种查询方法 但都失败了 query bool should regexp DstIP 172 3 0 1 1 6 9 2 0 9 minimum should
  • 如何在 Awk 中使用单个正则表达式提取多个字符串

    我有以下字符串 Mike has XXX cats and XXXXX dogs MikehasXXXcatsandXXXXXdogs 我想将 Xs 替换为与 Xs 数量相对应的数字 I tried awk match 0 X a leng
  • 无法安装文本

    使用命令pip install textract我无法在 Ubuntu 16 04 Python 2 上安装 texttract 我收到以下错误 Collecting textract Requirement already satisfi
  • 如何从CollectionType中获取随机元素?

    这是我的示例代码 import Foundation ar4random uniform extension CollectionType where Self Index Distance Int var randomElement Se
  • 是否可以在模拟器上使用已发布的即时应用程序(BuzzFeed、Wish 等)?

    我已经在模拟器上运行了一个 hello world 即时应用程序 如下这个谷歌的教程 https developer android com topic instant apps getting started first instant
  • 如何在 PHP 中实现数字签名

    我必须在我们用 PHP 打印为 PDF 的证书中实现数字签名 该要求类似于注册商或子注册商等授权人员对证书进行数字签名 实施数字签名背后的基本思想是要知道 证书是由授权人数字签名的 并且是该授权人或其他没有权限的人创建的 注册商 分注册商对
  • PostgreSQL 有一个好的数据库建模工具吗? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 在同一项目中跨表单使用类[关闭]

    Closed 这个问题需要细节或清晰度 help closed questions 目前不接受答案 假设我有一个具有几种不同形式的项目 Form1 Form2 Form3我还有一堂课叫CustomTools其中包含常用的验证程序和功能 cl
  • 计算两条曲线之间的面积

    我有一个包含曲线和直线的代码 我知道如何填充线下方和下方的区域 但我需要计算每个区域的面积值 这是代码 import matplotlib pyplot as plt import numpy as np x np arange 0 0 2
  • mamp mysql 无法启动

    我的 mysql 服务器有问题 当我启动 MAMP 时 Mysql 服务器不会启动 我尝试更改端口但没有帮助 我检查错误日志 发现以下内容 161010 09 21 07 mysqld safe Starting mysqld daemon
  • iPhone iOS UILabel 如何仅自定义 UITableView 详细文本标签的文本颜色?

    我正在开发一个界面原型 并使用故事板来实现 原型的一部分涉及将 UITableView 单元格的详细 UILabel 设置为某种颜色 我想避免必须手动重新着色故事板中的每个标签 我发现我可以使用 UILabel appearanceWhen
  • 多处理中的加入超时

    我有一个虚拟示例 我想在其中应用多重处理 考虑一个场景 其中有一串数字 我称之为帧 逐一传入 我想将其分配给当前可用的任何单个进程 所以我创建了 4 个正在运行的进程while循环 查看队列中是否有任何元素 然后对其应用函数 问题是当我jo