python中是否可以同时运行多个asyncio?

2024-01-01

根据我得到的解决方案:在 python 中使用 asyncio 运行多个套接字 https://stackoverflow.com/questions/62571622/running-multiple-sockets-using-asyncio-in-python

我尝试使用 asyncio 添加计算部分

设置:Python 3.7.4

import msgpack
import threading
import os
import asyncio
import concurrent.futures
import functools
import nest_asyncio
nest_asyncio.apply()

class ThreadSafeElem(bytes):
  def __init__(self, * p_arg, ** n_arg):
     self._lock = threading.Lock()
  def __enter__(self):
     self._lock.acquire()
     return self
  def __exit__(self, type, value, traceback):
     self._lock.release()

elem = ThreadSafeElem()

async def serialize(data):
   return msgpack.packb(data, use_bin_type=True)
async def serialize1(data1):
   return msgpack.packb(data1, use_bin_type=True)

async def process_data(data,data1):
   loop = asyncio.get_event_loop()
   future = await loop.run_in_executor(None, functools.partial(serialize, data))
   future1 = await loop.run_in_executor(None, functools.partial(serialize1, data1))
   return   await asyncio.gather(future,future1)

 ################ Calculation#############################
def calculate_data():
  global elem
  while True:
      try:
          ... data is calculated (some dictionary))...
          elem, elem1= asyncio.run(process_data(data, data1))
      except:
          pass
#####################################################################
def get_data():
  return elem
def get_data1():
  return elem1
########### START SERVER AND get data contionusly ################
async def client_thread(reader, writer):
  while True:
    try:
        bytes_received = await reader.read(100) 
        package_type = np.frombuffer(bytes_received, dtype=np.int8)
        if package_type ==1 :
           nn_output = get_data1()
        if package_type ==2 :
           nn_output = get_data()               
        writer.write(nn_output)
        await writer.drain()
    except:
        pass

async def start_servers(host, port):
  server = await asyncio.start_server(client_thread, host, port)
  await server.serve_forever()

async def start_calculate():
  await asyncio.run(calculate_data())

def enable_sockets():
 try:
    host = '127.0.0.1'
    port = 60000
    sockets_number = 6
    loop = asyncio.get_event_loop()
    for i in range(sockets_number):
        loop.create_task(start_servers(host,port+i))
    loop.create_task(start_calculate())
    loop.run_forever()
except:
    print("weird exceptions")
##############################################################################

enable_sockets()   

问题是,当我从客户端拨打电话时,服务器不会给我任何信息。

我用虚拟数据测试了程序,计算部分没有异步,所以没有这个循环.create_task(start_calculate())并且服务器响应正确。

我还运行计算数据,而不将其添加到启用套接字中,并且它有效。它也适用于此实现,但问题是服务器没有返回任何内容。

我这样做是因为我需要计算部分连续运行,并且当其中一个客户端调用时返回数据。


An asyncio事件循环不能嵌套在另一个事件循环中,这样做是没有意义的:asyncio.run(和类似的)blocks当前线程直到完成。这不会增加并行性,而只是禁用任何外部事件循环。

如果你想再嵌套一个asyncio任务,直接在当前事件循环中运行它。如果你想运行一个非合作的、阻塞的任务,在事件循环执行器中运行它 https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor.

async def start_calculate():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, calculate_data)

默认执行器使用线程——这允许运行阻塞任务,但不会增加并行性。使用自定义ProcessPoolExecutor使用额外的核心:

import concurrent.futures

async def start_calculate():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        await loop.run_in_executor(pool, calculate_data)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

python中是否可以同时运行多个asyncio? 的相关文章

  • Python PAM 模块的安全问题?

    我有兴趣编写一个 PAM 模块 该模块将利用流行的 Unix 登录身份验证机制 我过去的大部分编程经验都是使用 Python 进行的 并且我正在交互的系统已经有一个 Python API 我用谷歌搜索发现pam python http pa
  • Flask 和 uWSGI - 无法加载应用程序 0 (mountpoint='')(找不到可调用或导入错误)

    当我尝试使用 uWSGI 启动 Flask 时 出现以下错误 我是这样开始的 gt cd gt root localhost uwsgi socket 127 0 0 1 6000 file path to folder run py ca
  • pandas 替换多个值

    以下是示例数据框 gt gt gt df pd DataFrame a 1 1 1 2 2 b 11 22 33 44 55 gt gt gt df a b 0 1 11 1 1 22 2 1 33 3 2 44 4 3 55 现在我想根据
  • 如何使用包含代码的“asyncio.sleep()”进行单元测试?

    我在编写 asyncio sleep 包含的单元测试时遇到问题 我要等待实际的睡眠时间吗 I used freezegun到嘲笑时间 当我尝试使用普通可调用对象运行测试时 这个库非常有用 但我找不到运行包含 asyncio sleep 的测
  • 打破嵌套循环[重复]

    这个问题在这里已经有答案了 有没有比抛出异常更简单的方法来打破嵌套循环 在Perl https en wikipedia org wiki Perl 您可以为每个循环指定标签 并且至少继续一个外循环 for x in range 10 fo
  • Python tcl 未正确安装

    我刚刚为 python 安装了graphics py 但是当我尝试运行以下代码时 from graphics import def main win GraphWin My Circle 100 100 c Circle Point 50
  • __del__ 真的是析构函数吗?

    我主要用 C 做事情 其中 析构函数方法实际上是为了销毁所获取的资源 最近我开始使用python 这真的很有趣而且很棒 我开始了解到它有像java一样的GC 因此 没有过分强调对象所有权 构造和销毁 据我所知 init 方法对我来说在 py
  • 在pyyaml中表示具有相同基类的不同类的实例

    我有一些单元测试集 希望将每个测试运行的结果存储为 YAML 文件以供进一步分析 YAML 格式的转储数据在几个方面满足我的需求 但测试属于不同的套装 结果有不同的父类 这是我所拥有的示例 gt gt gt rz shorthand for
  • Abaqus 将曲面转化为集合

    我一直试图在模型中找到两个表面的中心 参见照片 但未能成功 它们是元素表面 面 查询中没有选项可以查找元素表面的中心 只能查找元素集的中心 找到节点集的中心也很好 但是我的节点集没有出现在工具 gt 查询 gt 质量属性选项中 而且我找不到
  • 当玩家触摸屏幕一侧时,如何让 pygame 发出警告?

    我使用 pygame 创建了一个游戏 当玩家触摸屏幕一侧时 我想让 pygame 给出类似 你不能触摸屏幕两侧 的错误 我尝试在互联网上搜索 但没有找到任何好的结果 我想过在屏幕外添加一个方块 当玩家触摸该方块时 它会发出警告 但这花了很长
  • Geopandas 设置几何图形:MultiPolygon“等于 len 键和值”的 ValueError

    我有 2 个带有几何列的地理数据框 我将一些几何图形从 1 个复制到另一个 这对于多边形效果很好 但对于任何 有效 多多边形都会返回 ValueError 请指教如何解决这个问题 我不知道是否 如何 为什么应该更改 MultiPolygon
  • Numpy 优化

    我有一个根据条件分配值的函数 我的数据集大小通常在 30 50k 范围内 我不确定这是否是使用 numpy 的正确方法 但是当数字超过 5k 时 它会变得非常慢 有没有更好的方法让它更快 import numpy as np N 5000
  • 如何改变Python中特定打印字母的颜色?

    我正在尝试做一个简短的测验 并且想将错误答案显示为红色 欢迎来到我的测验 您想开始吗 是的 祝你好运 法国的首都是哪里 法国 随机答案不正确的答案 我正在尝试将其显示为红色 我的代码是 print Welcome to my Quiz be
  • Python 3 中“map”类型的对象没有 len()

    我在使用 Python 3 时遇到问题 我得到了 Python 2 7 代码 目前我正在尝试更新它 我收到错误 类型错误 map 类型的对象没有 len 在这部分 str len seed candidates 在我像这样初始化它之前 se
  • 为美国东部以外地区的 Cloudwatch 警报发送短信?

    AWS 似乎没有为美国东部以外的 SNS 主题订阅者提供 SMS 作为协议 我想连接我的 CloudWatch 警报并在发生故障时接收短信 但无法将其发送到 SMS YES 经过一番挖掘后 我能够让它发挥作用 它比仅仅选择一个主题或输入闹钟
  • 类型错误:只能使用标量值执行操作

    如果您能让我知道如何为所提供的表格绘制一些信息丰富的图表 我将不胜感激here https www iasplus com en resources ifrs topics use of ifrs 例如 我需要一个名为 国内非上市公司 非上
  • 用于运行可执行文件的python多线程进程

    我正在尝试将一个在 Windows 上运行可执行文件并管理文本输出文件的 python 脚本升级到使用多线程进程的版本 以便我可以利用多个核心 我有四个独立版本的可执行文件 每个线程都知道要访问它们 这部分工作正常 我遇到问题的地方是当它们
  • Spark.read 在 Databricks 中给出 KrbException

    我正在尝试从 databricks 笔记本连接到 SQL 数据库 以下是我的代码 jdbcDF spark read format com microsoft sqlserver jdbc spark option url jdbc sql
  • Python 分析:“‘select.poll’对象的‘poll’方法”是什么?

    我已经使用 python 分析了我的 python 代码cProfile模块并得到以下结果 ncalls tottime percall cumtime percall filename lineno function 13937860 9
  • Pandas 与 Numpy 数据帧

    看这几行代码 df2 df copy df2 1 df 1 df 1 values 1 df2 ix 0 0 我们的教练说我们需要使用 values属性来访问底层的 numpy 数组 否则我们的代码将无法工作 我知道 pandas Data

随机推荐

  • MongoDb Pipeline Aggregation排序子子文档

    当尝试使用 MongooseJs 在 Mongodb 中按嵌套数组进行排序时 我遇到了一个小问题 a 一个产品包含任务 每个任务又包含子任务 b 任务有一个顺序 每个子任务也有顺序 task order 和 task subtask ord
  • 使用 Javascript 将用户发送至浏览器主页

    是否可以使用 Javascript 获取浏览器的主页 我想在页面上放置一个链接 该链接可以转到浏览器中设置的主页 编辑 简化答案 识别浏览器并 调用window home 适用于所有浏览器 调用window location href 关于
  • Left_join:错误:无法分配大小为“小”Mb 的向量

    我正在处理相当大的数据框 其中一个极端的数据框包含大约 300 000 行和 1 500 个变量 因此 在处理这些数据帧时 我有时会收到错误 Error cannot allocate vector of size x x Gb 大多数情况
  • Jquery 解析 XML

    我想使用 JQuery 读取以下 XML Jquery 应读取 XML 并以 HTML 形式显示以下内容 以下所有内容均应链接 News Articles Destinations Epics Tuesday Night Boulderin
  • Spring、事务、Hibernate 过滤器

    我在 Spring 中使用声明式事务 我有一个带有 事务性 注释的服务层 该服务层调用 DAO 我需要在所有 dao 方法中启用 hibernate 过滤器 我不想每次都显式调用 session enablefilter 那么有没有一种方法
  • long <-> str 二进制转换

    是否有任何库可以将很长的数字转换为字符串 只需复制数据 这些单行代码太慢了 def xlong s return sum ord c lt lt e 8 for e c in enumerate s def xstr x return ch
  • 我可以使用哪些 Solr 分词器和过滤器来进行强大的常规站点搜索?

    我想确保搜索 比如说 I B M 可以通过搜索找到ibm 我还想确保Dismemberment Plan可以通过搜索找到dismember 使用 Solr 我可以在分析和查询时使用什么标记器和过滤器来允许两种结果 对于 IBM gt ibm
  • R、ggplot - 共享相同 y 轴但具有不同 x 轴刻度的图表

    Context 我有一些数据集 变量 我想绘制它们 但我想以紧凑的方式做到这一点 为此 我希望它们共享相同的 y 轴但不同的 x 轴 并且由于分布不同 我希望其中一个 x 轴进行对数缩放 另一个进行线性缩放 Example 假设我有一个长尾
  • 在展会上与 React Native 立即通话

    我正在尝试在没有中间件对话框的情况下立即发起电话呼叫 我用过Linking openUrl 但它不起作用 react native immediate phone call 可以做到这一点 但它需要链接 这是不可能的expo 我能做些什么
  • 在 Visual Studio 2019 中更改 C# 版本

    我正在使用 Visual Studio 2019 我正在尝试更改我的 C 版本 我这样做的原因是我使用的构建服务器使用旧版本的 VS MSBuild 来构建和部署代码 这是我无法控制的 因此我需要使用 C 5 在 Visual Studio
  • 在 Pandas 中使用窗口进行动态离群值检测

    我想实现离群值检测 它将使用一个窗口来检查下一个元素是否是离群值 假设我们在 pd Series 上使用长度为 3 的窗口 如下所示 0 1 2 3 4 我会计算 0 1 2 上的中位数和疯狂值 或平均值和标准差 并检查 3 是否为异常值
  • 为什么非聚集索引扫描比聚集索引扫描更快?

    据我所知 堆表是没有聚集索引的表 没有物理顺序 我有一个包含 120k 行的堆表 扫描 我正在使用此选择 SELECT id FROM scan 如果我为 id 列创建非聚集索引 我得到223 物理读取 如果我删除非聚集索引并更改表以使 i
  • JSF 自定义复合组件与自定义经典组件之间有什么区别

    我想构建一个自定义 JSF 组件 现在我读了一些oracle的文档并看到了一些代码示例 问题是我有点困惑 似乎有两种方法可以使用 JSF 2 0 构建自定义组件 据我了解 自 JSF 2 0 以来 我可以使用这些复合组件来构建我自己的组件
  • OpenGL 中的帧缓冲区对象和像素缓冲区对象有什么区别?

    FBO 和 PBO 有什么区别 我应该使用哪一个来进行离屏渲染 FBO 和 PBO 有什么区别 更好的问题是它们有何相似之处 他们唯一相似的就是他们names A 帧缓冲对象 http www opengl org wiki Framebu
  • vim自动跳到下一行

    vim 中的一个令人沮丧的行为是 当我向右或向左移动光标 分别为 l 或 h 并且我位于行的末尾或开头时 我的光标不会移动到下一行的第一列或上一行的最后一列 有办法改变这种行为吗 您可以使用whichwrap设置使h and l环绕各行的开
  • 使用带有依赖项的 useEffect hook 时何时触发清理函数?

    我正在使用 useEffect 来显示 UI 加载 但仅在 250 毫秒后 它有效 但我真的不明白为什么 特别是 useEffect 如何以及何时调用返回的函数 清除超时 嗯 我不确定这是否完美 有时应该出现 正在加载 消息 但事实并非如此
  • 当使用多个 WHEN MATCHED 语句时,它们是全部执行,还是只执行一个?

    如果 MERGE 语句中有多个 WHEN MATCHED 语句 如果它们为真 它们是否都会执行 我的例子 DECLARE X bit NULL skipping the MERGE statement straight to WHEN MA
  • 分析 Ruby 程序调用的 C 共享库

    我有一个用Ruby和C编写的程序 C部分是一个共享库 它是Ruby程序的扩展 我想使用 gprof 分析我编写的 C 共享库 我像这样编译共享库 gcc I I usr lib ruby 1 8 i486 linux I usr lib r
  • 弱引用的好处

    有人可以解释一下 C 中不同类型引用的主要好处吗 弱引用 软参考 虚拟引用 强有力的参考 我们有一个消耗大量内存的应用程序 我们正在尝试确定这是否是一个需要关注的领域 我相信软引用和幻像引用来自 Java 长弱引用 将 true 传递给 C
  • python中是否可以同时运行多个asyncio?

    根据我得到的解决方案 在 python 中使用 asyncio 运行多个套接字 https stackoverflow com questions 62571622 running multiple sockets using asynci