异步队列消费者协程

2024-02-21

我有一个asyncio.Protocol子类从服务器接收数据。 我将这些数据(每行,因为数据是文本)存储在asyncio.Queue.

import asyncio

q = asyncio.Queue()

class StreamProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        for message in data.decode().splitlines():
            yield q.put(message.rstrip())

    def connection_lost(self, exc):
        self.loop.stop()

loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
                              '127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

我想要另一个协程负责消耗队列中的数据并处理它。

  • 这应该是一个asyncio.Task?
  • 如果队列因几秒钟内没有收到数据而变空怎么办?我如何确保我的消费者不会停止(run_until_complete)?
  • 有没有比为队列使用全局变量更干净的方法?

这应该是 asyncio.Task 吗?

是的,使用创建它asyncio.ensure_future https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future or 循环创建任务 https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.create_task.

如果队列因几秒钟内没有收到数据而变空怎么办?

只需使用队列.get https://docs.python.org/3/library/asyncio-queue.html#asyncio.Queue.get等待物品可用:

async def consume(queue):
    while True:
        item = await queue.get()
        print(item)

有没有比为队列使用全局变量更干净的方法?

是的,只需将其作为参数传递给消费者协程和流协议:

class StreamProtocol(asyncio.Protocol):
    def __init__(self, loop, queue):
        self.loop = loop
        self.queue = queue

    def data_received(self, data):
        for message in data.decode().splitlines():
            self.queue.put_nowait(message.rstrip())

    def connection_lost(self, exc):
        self.loop.stop()

如何确保我的消费者不会停止(run_until_complete)?

连接关闭后,使用队列.加入 https://docs.python.org/3/library/asyncio-queue.html#asyncio.Queue.join等到队列为空。


完整示例:

loop = asyncio.get_event_loop()
queue = asyncio.Queue()
# Connection coroutine
factory = lambda: StreamProtocol(loop, queue)
connection = loop.create_connection(factory, '127.0.0.1', '42')
# Consumer task
consumer = asyncio.ensure_future(consume(queue))
# Set up connection
loop.run_until_complete(connection)
# Wait until the connection is closed
loop.run_forever()
# Wait until the queue is empty
loop.run_until_complete(queue.join())
# Cancel the consumer
consumer.cancel()
# Let the consumer terminate
loop.run_until_complete(consumer)
# Close the loop
loop.close()

或者,您也可以使用streams https://docs.python.org/3/library/asyncio-stream.html?highlight=streams#tcp-echo-client-using-streams:

async def tcp_client(host, port, loop=None):
    reader, writer = await asyncio.open_connection(host, port, loop=loop)
    async for line in reader:
        print(line.rstrip())
    writer.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_client('127.0.0.1', 42, loop))
loop.close()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

异步队列消费者协程 的相关文章

  • 如何在 Matplotlib 饼图周围绘制箭头以将每个标签指向圆圈中各自的部分?

    我一直在用 Matplotlib 绘制一些图表 我有一个饼图 想要在图表周围绘制箭头 使每个标签都指向图表 我有一个例子 这是我当前的代码 import matplotlib pyplot as plt plt rcParams font
  • pandas DataFrame.join 的运行时间是多少(大“O”顺序)?

    这个问题更具概念性 理论性 与非常大的数据集的运行时间有关 所以我很抱歉没有一个最小的例子来展示 我有一堆来自两个不同传感器的数据帧 我需要最终将它们连接成两个very来自两个不同传感器的大数据帧 df snsr1 and df snsr2
  • 多输出堆叠回归器

    一次性问题 我正在尝试构建一个多输入堆叠回归器 添加到 sklearn 0 22 据我了解 我必须结合StackingRegressor and MultiOutputRegressor 经过多次尝试 这似乎是正确的顺序 import nu
  • 我应该使用 Python 双端队列还是列表作为堆栈? [复制]

    这个问题在这里已经有答案了 我想要一个可以用作堆栈的 Python 对象 使用双端队列还是列表更好 元素数量较少还是数量较多有什么区别 您的情况可能会根据您的应用程序和具体用例而有所不同 但在一般情况下 列表非常适合堆栈 append is
  • 来自 dll 的 Java 调用函数

    我有这个 python 脚本导入zkemkeeperdll 并连接到考勤设备 ZKTeco 这是我正在使用的脚本 from win32com client import Dispatch zk Dispatch zkemkeeper ZKE
  • Django Rest Framework 是否有第三方应用程序来自动生成 swagger.yaml 文件?

    我有大量的 API 端点编写在django rest framework并且不断增加和更新 如何创建和维护最新的 API 文档 我当前的版本是 Create swagger yaml文件并以某种方式在每次端点更改时自动生成 然后使用此文件作
  • 使用主题交换运行多个 Celery 任务

    我正在用 Celery 替换一些自制代码 但很难复制当前的行为 我期望的行为如下 创建新用户时 应向tasks与交换user created路由键 该消息应该触发两个 Celery 任务 即send user activate email
  • 打印数字时添加千位分隔符[重复]

    这个问题在这里已经有答案了 我真的不知道这个问题的 名称 所以它可能是一个不正确的标题 但问题很简单 如果我有一个数字 例如 number 23543 second 68471243 我想要它使print 像这样 23 54368 471
  • 使用 python/numpy 重塑数组

    我想重塑以下数组 gt gt gt test array 11 12 13 14 21 22 23 24 31 32 33 34 41 42 43 44 为了得到 gt gt gt test2 array 11 12 21 22 13 14
  • 使用 Python Oauthlib 通过服务帐户验证 Google API

    我不想使用适用于 Python 的 Google API 客户端库 但仍想使用 Python 访问 Google APIOauthlib https github com idan oauthlib 创建服务帐户后谷歌开发者控制台 http
  • 通过Python连接到Bigquery:ProjectId和DatasetId必须非空

    我编写了以下脚本来通过 SDK 将 Big Query 连接到 Python 如下所示 from google cloud import bigquery client bigquery Client project My First Pr
  • 尽管我已在 python ctypes 中设置了信号处理程序,但并未调用它

    我尝试过使用 sigaction 和 ctypes 设置信号处理程序 我知道它可以与python中的信号模块一起使用 但我想尝试学习 当我向该进程发送 SIGTERM 时 但它没有调用我设置的处理程序 只打印 终止 为什么它不调用处理程序
  • 如何使用 Python 3 检查目录是否包含文件

    我到处寻找这个答案但找不到 我正在尝试编写一个脚本来搜索特定的子文件夹 然后检查它是否包含任何文件 如果包含 则写出该文件夹的路径 我已经弄清楚了子文件夹搜索部分 但检查文件却难倒了我 我发现了有关如何检查文件夹是否为空的多个建议 并且我尝
  • 找到一个数字所属的一组范围

    我有一个 200k 行的数字范围列表 例如开始位置 停止位置 该列表包括除了非重叠的重叠之外的所有类型的重叠 列表看起来像这样 3 5 10 30 15 25 5 15 25 35 我需要找到给定数字所属的范围 并对 100k 个数字重复该
  • 在 Google App Engine 中,如何避免创建具有相同属性的重复实体?

    我正在尝试添加一个事务 以避免创建具有相同属性的两个实体 在我的应用程序中 每次看到新的 Google 用户登录时 我都会创建一个新的播放器 当新的 Google 用户在几毫秒内进行多个 json 调用时 我当前的实现偶尔会创建重复的播放器
  • 如何使用 AWS Lambda Python 读取 AWS S3 存储的 Word 文档(.doc 和 .docx)文件内容?

    我的场景是 我尝试使用 python 实现从 Aws Lambda 读取 AWS 存储的 S3 word 文档 doc 和 docx 文件内容 下面的代码是我使用的 我的问题是我可以获取文件名 但无法读取内容 def lambda hand
  • 每当使用 import cv2 时 OpenCV 都会出错

    我在终端上使用 pip3 install opencv contrib python 安装了 cv2 并且它工作了 但是每当我尝试导入 cv2 或运行导入了 cv2 的 vscode 文件时 在 python IDLE 上它都会说 Trac
  • 在virtualenv中下载sqlite3

    我正在尝试使用命令创建应用程序python3 manage py startapp webapp但我收到一条错误消息 django core exceptions ImproperlyConfigured 加载时出错 pysqlite2 或
  • pandas.read_csv 将列名移动一倍

    我正在使用位于的 ALL zip 文件here http www fec gov disclosurep PDownload do 我的目标是用它创建一个 pandas DataFrame 但是 如果我跑 data pd read csv
  • NLTK:查找单词大小为 2k 的上下文

    我有一个语料库 我有一个词 对于语料库中该单词的每次出现 我想获取一个包含该单词之前的 k 个单词和该单词之后的 k 个单词的列表 我在算法上做得很好 见下文 但我想知道 NLTK 是否提供了一些我错过的功能来满足我的需求 def size

随机推荐