使用 asyncio 时无法使用 os.fork() 将多个进程绑定到一个套接字服务器

2023-12-01

我们都知道,使用 asyncio 可以显着提高套接字服务器的性能,如果我们能够利用 cpu 中的所有核心(可能通过多处理模块或os.fork() etc.)

我现在正在尝试构建一个多核套接字服务器演示,其中一个异步套接字服务器侦听每个核心并全部绑定到一个端口。只需创建一个异步服务器,然后使用os.fork(),让流程竞争性地运作。

然而,当我尝试分叉时,单核精细代码遇到了一些麻烦。似乎在 epoll 选择器模块中从不同进程注册相同的文件描述符存在一些问题。

我在下面显示了一些代码,有人可以帮我吗?


这是使用 asyncio 的 echo 服务器的简单、逻辑清晰的代码:

import os
import asyncio #,uvloop
from socket import *

# hendler sends back incoming message directly
async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
async def create_server(loop):
    sock = socket(AF_INET ,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
    sock.bind(('',25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client ,addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop ,client))

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()

它工作正常,直到我尝试分叉、套接字绑定之后和服务器开始服务之前。 (这个逻辑在基于同步线程的代码中工作得很好。)


当我尝试这个时:

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))

from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop.create_task(serving(loop, sock))
loop.run_forever()

理论上分叉的进程会绑定到同一个套接字吗?并在同一个事件循环中运行?然后工作就好吗?

但是我收到这些错误消息:

Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/temp1.py", line 23, in serving
    client ,addr = await loop.sock_accept(sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
    self._sock_accept(fut, False, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
    self.add_reader(fd, self._sock_accept, fut, True, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
    (handle, None))
  File "/usr/local/lib/python3.7/selectors.py", line 359, in register
    self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists

Python版本3.7.3,

我对发生的事情完全感到困惑。

有人可以帮忙吗?谢谢


根据跟踪器问题,不支持分叉现有的异步事件循环并尝试从多个进程使用它。然而,根据尤里的评论对于同一问题,可以通过在启动循环之前分叉来实现多处理,从而在每个子进程中运行完全独立的异步循环。

您的代码实际上证实了这种可能性: whilecreate_server is async def,它不等待任何东西,也不使用loop争论。所以我们可以通过以下方式实现 Yury 的方法create_server常规功能,删除loop参数,并在之前调用它os.fork(),并且仅在分叉后运行事件循环:

import os, asyncio, socket, multiprocessing

async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
def create_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client, addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop, client))

sock = create_server()

for num in range(multiprocessing.cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 asyncio 时无法使用 os.fork() 将多个进程绑定到一个套接字服务器 的相关文章

随机推荐

  • ZenTest 自动测试不运行测试

    我有过之前的自动测试问题我通过从 ZenTest 4 1 4 降级到 4 1 3 解决了这个问题 ruby v ruby 1 8 7 2008 08 11 patchlevel 72 universal darwin10 0 rails v
  • 绘制圆的一部分

    对于 iPhone 应用程序 我想画一个圆圈 这只是填充 x 百分比 像这样的事情 我在计算半径 度数或弧度时没有任何问题 这没问题 画圆也已经完成了 但是我如何让iPhone SDK来绘制填充的部分呢 我可以画一个同样大小的矩形 但不能画
  • Google 云存储 - JAVA REST API - 获取签名不匹配

    我正在使用 jersey client 进行 REST Call 我收到 SignatureDoesNotMatch 响应错误 我试图使用 GET Service 列出 Bucket 名称 还尝试使用 GET Bucket 方法列出 Buc
  • 在 ActionBar 上添加阴影

    我试图在 ActionBar 的标题下添加阴影 我试过把属性
  • 如何使用 OpenTok 选择音频输出

    我正在构建一个简单的WebRTC应用程序与OpenTok 我需要能够选择摄像头 音频输入和音频输出 目前看来这并不容易实现 See opentok 硬件设置 https github com opentok opentok hardware
  • C#中通过代码设置列表框项的字体和颜色

    我正忙于一个自定义列表框 我将其用作 C 中的寄存器读取器 现在我想在确定的项目中设置一个确定的项目 其字体和颜色与其他项目不同 我检查了这个问题根据答案我编写了以下代码 private void myListBox DrawItem ob
  • 如何只读取英文字符

    我正在阅读一个有时包含中文和英文以外语言字符的文件 如何编写一个只读取英文单词 字母的正则表达式 难道只是 a zA Z 如果我执行上述操作 那么像 e t 这样的词仍然会被选中 但我不希望这样 t match a zA Z gt nil
  • 搜索栏,将路径颜色从黄色更改为白色

    我有两个问题 1 如何将搜索栏 路径 的颜色从黄色 默认颜色 更改为白色 我的意思是 当我滑动拇指时 它会将穿过的线从灰色变为黄色 我希望轨道 线路保持灰色或白色 基本上 我只想移动拇指 而搜索栏中没有颜色变化 2 如何将搜索栏的拇指从矩形
  • Valgrind 检测到仍然存在泄漏

    本块中提到的所有函数都是库函数 我怎样才能纠正这个内存泄漏 它列在 仍然可达 类别 还有 4 个 非常相似 但大小不同 630 bytes in 1 blocks are still reachable in loss record 5 o
  • 不使用 libSystem macOS 链接目标文件

    我正在为 x86 64 上的 macOS 编写一个编译器 但是当我将目标文件链接在一起时 ld says ld dynamic main executables must link with libSystem dylib for infe
  • 为什么只有部分设备会收到推送通知

    我设置了一个推送通知服务 根据 RSS 源向客户端发送通知 我有一项服务每分钟运行一次 以查看是否有新帖子添加到提要中 如果是这样 该服务将向所有客户端发送通知 然而 一些人一直抱怨说他们没有收到任何推送通知 这是我用来发送消息的函数 fu
  • 多次克隆 NumPy 数组

    我将一张图片加载到 numpy 数组中 需要将其图片设置为 2 个不同的阈值 import numpy as np import cv2 cap cv2 Videocapture 0 pic cap read pic1 pic pic2 p
  • 受限 CRTP 过早拒绝

    我正在尝试实现一个从基模板继承的派生类 并将派生类作为其模板参数 希望下面的示例能够澄清问题 template
  • 如何在 Windows 8 中重复使用在 Mac 上创建的软链接

    我在 MacBook Pro 中创建的 1000 张图像说 我的软链接很少 我正在 iOS 应用程序中使用它们 现在我正在 Windows 8 手机应用程序中移植相同的应用程序 因此我想在 Windows Phone 8 应用程序中重用相同
  • Java 中使用派生类型作为参数的方法重载

    假设我有现有的代码 我想扩展它 但又想尽可能避免更改它 这段代码周围有一个接收某种类型的方法 Engine method Base b 现在 我想扩展这个功能 因此 我将 Base 扩展为一个名为 Derived 的类型 它包含我需要的更多
  • 如何在 angular2 中应用主题?

    我需要为我在 angular2 中开发的 Web 应用程序提供两个主题 红色 蓝色 当我更改主题时 所有组件都应该反映它吗 在 Angular2 中应用主题的最佳实践是什么 您可以使用文件代币从 angular platform b row
  • JDBCPreparedStatement导致MySQL语法错误

    我收到错误 您的 SQL 语法有错误 请检查与您的 MySQL 服务器版本对应的手册 了解在第 1 行 orderr 附近使用的正确语法 所以我认为错误是我使用了两个 但在我的代码中我没有使用任何 注意该表实际上被命名为 order pub
  • 如何将 cv::MAT 转换为 NHCW 格式?

    在User Guide html中 tensorRT的输入 输出需要使用NCHW格式 什么是 NCHW 格式 如何将 cv MAT 转换为 NCHW 格式 我使用 TensorRT 运行推理 如下代码所示 没有任何错误 但是 这不是正确的输
  • 用于 WPF 富客户端应用程序的图像编辑器组件

    您是否知道可在 WPF 客户端应用程序中使用的简单 NET 组件 该组件具有以下功能 将图像从文件或剪贴板加载到画布中 缩放和移动 定义画布背景颜色 将画布导出为新图像 提前谢谢 普伦森 这些呢 http xdraw codeplex co
  • 使用 asyncio 时无法使用 os.fork() 将多个进程绑定到一个套接字服务器

    我们都知道 使用 asyncio 可以显着提高套接字服务器的性能 如果我们能够利用 cpu 中的所有核心 可能通过多处理模块或os fork etc 我现在正在尝试构建一个多核套接字服务器演示 其中一个异步套接字服务器侦听每个核心并全部绑定