如何连接持续生成和使用数据的 asyncio.coroutines?

2024-01-14

我正在尝试学习如何(惯用地)使用Python 3.4asyncio。我最大的障碍是如何“链接”不断消耗数据的协程,用数据更新状态,并允许另一个协程使用该状态。

我期望从这个示例程序中观察到的行为只是定期报告从子进程接收到的数字总和。报告的发生率应大致与Source对象从子进程接收数字。报告函数中的 IO 阻塞不应阻塞子进程的读取。如果报告功能阻塞的时间比从子进程读取迭代的时间长,我不在乎它是否向前跳过或立即报告一堆;但应该有尽可能多的迭代reporter()因为有expect_exact()在足够长的时间范围内。

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)

        p.terminate()

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source # ???
        print("New sum is: {:d}".format(new_sum))
        # Potentially some other blocking operation
        yield from limited_throughput.write(new_sum)

def main():
    loop = asyncio.get_event_loop()

    source = Source()
    loop.call_later(1, source.start)
    loop.call_later(11, source.stop)

    # Again, not sure what goes here...
    asyncio.async(reporter(source))

    loop.run_until_complete(source.run())
    loop.close()

if __name__ == '__main__':
    main()

这个例子需要pexpect从 git 安装;你可以很容易地更换run() with:

@asyncio.coroutine
def run(self):
    yield from self.flag.wait()

    while self.flag.is_set():
        value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
        self.sum += value

但我感兴趣的真正子进程需要在pty,我认为这意味着提供的子进程传输/协议框架asyncio对此还不够。要点是异步活动的源是一个协程,可以与yield from.

请注意,reporter()此示例中的函数不是有效代码;我的问题是我不知道里面应该放什么。理想情况下我想保留reporter()代码独立于run();本练习的目的是了解如何使用以下组件将更复杂的程序分解为更小的代码单元asyncio.

有没有一种方法可以构建这种行为asyncio module?


中的锁定原语和队列asyncio它本身提供了一些机制来做到这一点。

状况

The asyncio.Condition()提供一种通知条件的方法。当您删除某些事件并不重要时,请使用此选项。

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

        # For consumers
        self.ready = asyncio.Condition()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)
            with (yield from self.ready):
                self.ready.notify_all() # Or just notify() depending on situation

        p.terminate()

    @asyncio.coroutine
    def read(self):
        with (yield from self.ready):
            yield from self.ready.wait()
            return self.sum


@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff in here

Queues

The asyncio.Queue()允许您将数据放入队列(LIFO 或 FIFO)中并从中读取其他内容。如果您绝对想响应每个事件,即使您的消费者(及时)落后,也可以使用此选项。请注意,如果您限制队列的大小,并且您的消费者足够慢,那么您的生产者最终将阻塞。

请注意,这允许我们转换sum也到局部变量。

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        # NOTE: self.sum removed!

        # For consumers
        self.output = asyncio.Queue()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        sum = 0

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            sum += int(p.before)
            yield from self.output.put(sum)

        p.terminate()

    @asyncio.coroutine
    def read(self):
        return (yield from self.output.get())

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff here

请注意,Python 3.4.4 添加task_done() and join()的方法Queue,当您知道消费者已完成(如果适用)时,您可以优雅地完成所有处理。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何连接持续生成和使用数据的 asyncio.coroutines? 的相关文章

  • 使用Python开发Web应用程序

    我一直在用 python 做一些工作 但这都是针对独立应用程序的 我很想知道 python 的任何分支是否支持 Web 开发 有人还会建议一个好的教程或网站吗 我可以从中学习一些使用 python 进行 Web 开发的基础知识 既然大家都说
  • 如何在python中读取多个文件中的文本

    我的文件夹中有许多文本文件 大约有 3000 个文件 每个文件中第 193 行是唯一包含重要信息的行 我如何使用 python 将所有这些文件读入 1 个文本文件 os 模块中有一个名为 list dir 的函数 该函数返回给定目录中所有文
  • DreamPie 不适用于 Python 3.2

    我最喜欢的 Python shell 是DreamPie http dreampie sourceforge net 我想将它与 Python 3 2 一起使用 我使用了 添加解释器 DreamPie 应用程序并添加了 Python 3 2
  • 如何打印没有类型的defaultdict变量?

    在下面的代码中 from collections import defaultdict confusion proba dict defaultdict float for i in xrange 10 confusion proba di
  • 如何在 Sublime Text 2 的 OSX 终端中显示构建结果

    我刚刚从 TextMate 切换到 Sublime Text 2 我非常喜欢它 让我困扰的一件事是默认的构建结果显示在 ST2 的底部 我的程序产生一些很长的结果 显示它的理想方式 如在 TM2 中 是并排查看它们 如何在 Mac 操作系统
  • 更改自动插入 tkinter 小部件的文本颜色

    我有一个文本框小部件 其中插入了三条消息 一条是开始消息 一条是结束消息 一条是在 单位 被摧毁时发出警报的消息 我希望开始和结束消息是黑色的 但被毁坏的消息 参见我在代码中评论的位置 插入小部件时颜色为红色 我不太确定如何去做这件事 我看
  • __del__ 真的是析构函数吗?

    我主要用 C 做事情 其中 析构函数方法实际上是为了销毁所获取的资源 最近我开始使用python 这真的很有趣而且很棒 我开始了解到它有像java一样的GC 因此 没有过分强调对象所有权 构造和销毁 据我所知 init 方法对我来说在 py
  • 如何使用装饰器禁用某些功能的中间件?

    我想模仿的行为csrf exempt see here https docs djangoproject com en 1 11 ref csrf django views decorators csrf csrf exempt and h
  • keras加载模型错误尝试将包含17层的权重文件加载到0层的模型中

    我目前正在使用 keras 开发 vgg16 模型 我用我的一些图层微调 vgg 模型 拟合我的模型 训练 后 我保存我的模型model save name h5 可以毫无问题地保存 但是 当我尝试使用以下命令重新加载模型时load mod
  • NameError:名称“urllib”未定义”

    CODE import networkx as net from urllib request import urlopen def read lj friends g name fetch the friend list from Liv
  • 当玩家触摸屏幕一侧时,如何让 pygame 发出警告?

    我使用 pygame 创建了一个游戏 当玩家触摸屏幕一侧时 我想让 pygame 给出类似 你不能触摸屏幕两侧 的错误 我尝试在互联网上搜索 但没有找到任何好的结果 我想过在屏幕外添加一个方块 当玩家触摸该方块时 它会发出警告 但这花了很长
  • Python:尝试检查有效的电话号码

    我正在尝试编写一个接受以下格式的电话号码的程序XXX XXX XXXX并将条目中的任何字母翻译为其相应的数字 现在我有了这个 如果启动不正确 它将允许您重新输入正确的数字 然后它会翻译输入的原始数字 我该如何解决 def main phon
  • Python - 按月对日期进行分组

    这是一个简单的问题 起初我认为很简单而忽略了它 一个小时过去了 我不太确定 所以 我有一个Python列表datetime对象 我想用图表来表示它们 x 值是年份和月份 y 值是此列表中本月发生的日期对象的数量 也许一个例子可以更好地证明这
  • Python 3 中“map”类型的对象没有 len()

    我在使用 Python 3 时遇到问题 我得到了 Python 2 7 代码 目前我正在尝试更新它 我收到错误 类型错误 map 类型的对象没有 len 在这部分 str len seed candidates 在我像这样初始化它之前 se
  • Python:计算字典的重复值

    我有一本字典如下 dictA unit1 test1 alpha unit1 test2 beta unit2 test1 alpha unit2 test2 gamma unit3 test1 delta unit3 test2 gamm
  • 在 Pandas DataFrame Python 中添加新列[重复]

    这个问题在这里已经有答案了 例如 我在 Pandas 中有数据框 Col1 Col2 A 1 B 2 C 3 现在 如果我想再添加一个名为 Col3 的列 并且该值基于 Col2 式中 如果Col2 gt 1 则Col3为0 否则为1 所以
  • 用于运行可执行文件的python多线程进程

    我正在尝试将一个在 Windows 上运行可执行文件并管理文本输出文件的 python 脚本升级到使用多线程进程的版本 以便我可以利用多个核心 我有四个独立版本的可执行文件 每个线程都知道要访问它们 这部分工作正常 我遇到问题的地方是当它们
  • 在python中,如何仅搜索所选子字符串之前的一个单词

    给定文本文件中的长行列表 我只想返回紧邻其前面的子字符串 例如单词狗 描述狗的单词 例如 假设有这些行包含狗 hotdog big dog is dogged dog spy with my dog brown dogs 在这种情况下 期望
  • Python:元类属性有时会覆盖类属性?

    下面代码的结果让我感到困惑 class MyClass type property def a self return 1 class MyObject object metaclass MyClass a 2 print MyObject
  • 改变字典的哈希函数

    按照此question https stackoverflow com questions 37100390 towards understanding dictionaries 我们知道两个不同的字典 dict 1 and dict 2例

随机推荐