如何使用生成器形成多个管道?

2024-01-26

我正在使用 python,并且正在尝试找到一种将多个生成器优雅地链接在一起的方法。问题的一个例子是,例如,有一个根生成器,它提供某种数据,每个值都像级联一样传递给它的“子级”,而级联反过来可能会修改它们接收的对象。我可以走这条路:

for x in gen1:
    gen2(x)
    gen3(x)

但它很丑而且不优雅。我正在考虑一种更实用的做事方式。


您可以将生成器变成协程,这样它们就可以send()并互相接收值(使用(yield)表达式)。这将使每个人都有机会更改他们收到的值,和/或将它们传递到下一个生成器/协程(或完全忽略它们)。

请注意,在下面的示例代码中,我使用了一个名为coroutine“启动”生成器/协程函数。这使得它们在第一次执行之前就执行yield表达/陈述。这是 YouTube 视频中的一个稍加修改的版本,该视频是一个非常有启发性的演讲,标题为关于协程和并发的有趣课程 https://www.youtube.com/watch?v=Z_OAlIhXziw这是 Dave Beazley 在 PyCon 2009 上发表的。

正如您应该能够从生成的输出中看到的,数据值正在由通过单个配置的每个管道进行处理send()到头部协程,然后头部协程有效地将其“复用”到每个管道中。由于每个子协程也执行此操作,因此可以建立一个复杂的进程“树”。

import sys

def coroutine(func):
    """ Decorator to "prime" generators used as coroutines. """
    def start(*args,**kwargs):
        cr = func(*args,**kwargs)  # Create coroutine generator function.
        next(cr)                   # Advance to just before its first yield.
        return cr
    return start

def pipe(name, value, divisor, coroutines):
    """ Utility function to send values to list of coroutines. """
    print('  {}: {} is divisible by {}'.format(name, value, divisor))
    for cr in coroutines:
        cr.send(value)

def this_func_name():
    """ Helper function that returns name of function calling it. """
    frame = sys._getframe(1)
    return frame.f_code.co_name


@coroutine
def gen1(*coroutines):
    while True:
        value = (yield)     # Receive values sent here via "send()".
        if value % 2 == 0:  # Only pipe even values.
            pipe(this_func_name(), value, 2, coroutines)

@coroutine
def gen2(*coroutines):
    while True:
        value = (yield)     # Receive values sent here via "send()".
        if value % 4 == 0:  # Only pipe values divisible by 4.
            pipe(this_func_name(), value, 4, coroutines)

@coroutine
def gen3(*coroutines):
    while True:
        value = (yield)     # Receive values sent here via "send()".
        if value % 6 == 0:  # Only pipe values divisible by 6.
            pipe(this_func_name(), value, 6, coroutines)

# Create and link together some coroutine pipelines.
g3 = gen3()
g2 = gen2()
g1 = gen1(g2, g3)

# Send values through both pipelines (g1 -> g2, and g1 -> g3) of coroutines.
for value in range(17):
    print('piping {}'.format(value))
    g1.send(value)

Output:

piping 0
  gen1: 0 is divisible by 2
  gen2: 0 is divisible by 4
  gen3: 0 is divisible by 6
piping 1
piping 2
  gen1: 2 is divisible by 2
piping 3
piping 4
  gen1: 4 is divisible by 2
  gen2: 4 is divisible by 4
piping 5
piping 6
  gen1: 6 is divisible by 2
  gen3: 6 is divisible by 6
piping 7
piping 8
  gen1: 8 is divisible by 2
  gen2: 8 is divisible by 4
piping 9
piping 10
  gen1: 10 is divisible by 2
piping 11
piping 12
  gen1: 12 is divisible by 2
  gen2: 12 is divisible by 4
  gen3: 12 is divisible by 6
piping 13
piping 14
  gen1: 14 is divisible by 2
piping 15
piping 16
  gen1: 16 is divisible by 2
  gen2: 16 is divisible by 4
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用生成器形成多个管道? 的相关文章

  • 使用Python开发Web应用程序

    我一直在用 python 做一些工作 但这都是针对独立应用程序的 我很想知道 python 的任何分支是否支持 Web 开发 有人还会建议一个好的教程或网站吗 我可以从中学习一些使用 python 进行 Web 开发的基础知识 既然大家都说
  • Django REST序列化器:创建对象而不保存

    我已经开始使用 Django REST 框架 我想做的是使用一些 JSON 发布请求 从中创建一个 Django 模型对象 然后使用该对象而不保存它 我的 Django 模型称为 SearchRequest 我所拥有的是 api view
  • 将字符串转换为带有毫秒和时区的日期时间 - Python

    我有以下 python 片段 from datetime import datetime timestamp 05 Jan 2015 17 47 59 000 0800 datetime object datetime strptime t
  • Python PAM 模块的安全问题?

    我有兴趣编写一个 PAM 模块 该模块将利用流行的 Unix 登录身份验证机制 我过去的大部分编程经验都是使用 Python 进行的 并且我正在交互的系统已经有一个 Python API 我用谷歌搜索发现pam python http pa
  • Pycharm Python 控制台不打印输出

    我有一个从 Pycharm python 控制台调用的函数 但没有显示输出 In 2 def problem1 6 for i in range 1 101 2 print i end In 3 problem1 6 In 4 另一方面 像
  • 如何打印没有类型的defaultdict变量?

    在下面的代码中 from collections import defaultdict confusion proba dict defaultdict float for i in xrange 10 confusion proba di
  • 打破嵌套循环[重复]

    这个问题在这里已经有答案了 有没有比抛出异常更简单的方法来打破嵌套循环 在Perl https en wikipedia org wiki Perl 您可以为每个循环指定标签 并且至少继续一个外循环 for x in range 10 fo
  • 运行多个 scrapy 蜘蛛的正确方法

    我只是尝试使用在同一进程中运行多个蜘蛛新的 scrapy 文档 http doc scrapy org en 1 0 topics practices html但我得到 AttributeError CrawlerProcess objec
  • Python 中的二进制缓冲区

    在Python中你可以使用StringIO https docs python org library struct html用于字符数据的类似文件的缓冲区 内存映射文件 https docs python org library mmap
  • 在pyyaml中表示具有相同基类的不同类的实例

    我有一些单元测试集 希望将每个测试运行的结果存储为 YAML 文件以供进一步分析 YAML 格式的转储数据在几个方面满足我的需求 但测试属于不同的套装 结果有不同的父类 这是我所拥有的示例 gt gt gt rz shorthand for
  • python 集合可以包含的值的数量是否有限制?

    我正在尝试使用 python 设置作为 mysql 表中 ids 的过滤器 python集存储了所有要过滤的id 现在大约有30000个 这个数字会随着时间的推移慢慢增长 我担心python集的最大容量 它可以包含的元素数量有限制吗 您最大
  • 表达式中的 Python 'in' 关键字与 for 循环中的比较 [重复]

    这个问题在这里已经有答案了 我明白什么是in运算符在此代码中执行的操作 some list 1 2 3 4 5 print 2 in some list 我也明白i将采用此代码中列表的每个值 for i in 1 2 3 4 5 print
  • 如何将 numpy.matrix 提高到非整数幂?

    The 运算符为numpy matrix不支持非整数幂 gt gt gt m matrix 1 0 0 5 0 5 gt gt gt m 2 5 TypeError exponent must be an integer 我想要的是 oct
  • ExpectedFailure 被计为错误而不是通过

    我在用着expectedFailure因为有一个我想记录的错误 我现在无法修复 但想将来再回来解决 我的理解expectedFailure是它会将测试计为通过 但在摘要中表示预期失败的数量为 x 类似于它如何处理跳过的 tets 但是 当我
  • Numpy 优化

    我有一个根据条件分配值的函数 我的数据集大小通常在 30 50k 范围内 我不确定这是否是使用 numpy 的正确方法 但是当数字超过 5k 时 它会变得非常慢 有没有更好的方法让它更快 import numpy as np N 5000
  • Python:计算字典的重复值

    我有一本字典如下 dictA unit1 test1 alpha unit1 test2 beta unit2 test1 alpha unit2 test2 gamma unit3 test1 delta unit3 test2 gamm
  • VSCode:调试配置中的 Python 路径无效

    对 Python 和 VSCode 以及 stackoverflow 非常陌生 直到最近 我已经使用了大约 3 个月 一切都很好 当尝试在调试器中运行任何基本的 Python 程序时 弹出窗口The Python path in your
  • 在 Pandas DataFrame Python 中添加新列[重复]

    这个问题在这里已经有答案了 例如 我在 Pandas 中有数据框 Col1 Col2 A 1 B 2 C 3 现在 如果我想再添加一个名为 Col3 的列 并且该值基于 Col2 式中 如果Col2 gt 1 则Col3为0 否则为1 所以
  • 对输入求 Keras 模型的导数返回全零

    所以我有一个 Keras 模型 我想将模型的梯度应用于其输入 这就是我所做的 import tensorflow as tf from keras models import Sequential from keras layers imp
  • 在python中,如何仅搜索所选子字符串之前的一个单词

    给定文本文件中的长行列表 我只想返回紧邻其前面的子字符串 例如单词狗 描述狗的单词 例如 假设有这些行包含狗 hotdog big dog is dogged dog spy with my dog brown dogs 在这种情况下 期望

随机推荐

  • Conda 不会删除包

    我的命令是否做错了什么 我无法删除 Keras conda remove name myEnv keras Collecting package metadata repodata json done Solving environment
  • CORS:预检通过,主请求完成 w/200,但浏览器仍然存在 Origin 错误

    我正在向运行 Express 的节点服务器发送 CORS ajax 请求 在服务器日志和 js 控制台中 我可以看到预检 OPTIONS 请求成功 然后 主请求也在服务器上成功 并以 200 和我认为正确的标头进行响应 但是 在 Chrom
  • 实施 Sitecore Multisite Robots.txt 文件

    如何为同一 Sitecore 解决方案上托管的每个网站实现不同的 robots txt 文件 我想从 sitecore 项目中动态读取 robots txt 您需要执行以下步骤 1 创建并实现您的自定义通用 ashx 处理程序 2 在 we
  • 滚动到时开始动画

    我花了一整天的时间寻找一种简单的方法来让我的动画在滚动到页面上的特定位置后开始 My css animation width 50 float left position relative webkit animation name tes
  • LaunchD Plist 不工作

    编辑 看起来好像我在控制台中收到错误 com apple launchd com xxxx adbind 57 退出 代码 1 那有什么意思 还 如果我加载使用 launchctl 命令登录的 launchd plist 文件 它工作正常
  • 在 web.config 中存储值 - appSettings 或 configSection - 哪个更有效?

    我正在编写一个可以使用几个不同主题的页面 并且我将在 web config 中存储有关每个主题的一些信息 创建一个新的sectionGroup并将所有内容存储在一起 还是将所有内容都放在appSettings中 是否更有效 配置部分解决方案
  • 如何将现有的 Mercurial 存储库转换为使用子存储库并保持历史记录完整?

    我一直在阅读有关子存储库以及如何使用转换扩展和文件映射将现有文件夹从 Mercurial 存储库提取到子存储库的内容 我可以成功地做到这一点 如果我有以下文件夹结构 C Project Project root txt Project Su
  • 防止所有非系统 shell 扩展加载到 GetOpenFileName、CFileDialog、IFileOpenDialog 等中

    我正在寻找一种编程方式来使用资源管理器 shell 提示用户输入文件名 并且我只希望加载系统 shell 扩展 我寻找此功能的原因是我想消除第 3 方 shell 扩展作为崩溃和其他不确定行为的可能原因 理想情况下 在某个地方有一个我错过的
  • Visual Studio Online 网站以调试模式部署到 Azure

    我创建了一个 ASP NET MVC Web 应用程序项目 将其提交到 Visual Studio Online Git 存储库 并链接到 Windows Azure 网站以进行自动部署 如果所有单元测试都成功 我的项目包含默认的 Web
  • 如何在 Angular2 上使用 onBlur 事件?

    如何检测 Angular2 中的 onBlur 事件 我想用它
  • 在 Perl 中计算字符串中单词数的最快方法是什么?

    我有一些函数在各种文本上运行了超过一百万次 这意味着这些函数的微小改进总体上会带来巨大的收益 目前 我注意到所有涉及字数统计的功能的运行时间都比其他功能要长得多 所以我想尝试以不同的方式进行字数统计 基本上 我的函数所做的就是获取许多具有与
  • 阻止用户输入字母? C++

    你好 我是 C 新手 但我有一个小问题 那就是我必须阻止用户在数字部分输入字母 我做了一次尝试 虽然有效 但很狡猾 因为它将允许用户继续 然后告诉他们出了问题并重新启动应用程序 我如何验证它以显示错误消息 告诉他们这不是数字并让他们重新输入
  • 如何根据对象索引合并两个列表 - 保留属性?

    我想合并两个列表 保留每个对象的索引 mylist lt list 1 NULL 2 otherlist lt list NULL 3 NULL 4 5 6 Desired list 1 3 2 4 5 6 my try suppressW
  • System.IO.IOException:使用 Directory.EnumerateDirectories 时句柄无效

    我有一个窗口服务可以将东西导入到我的系统中 有时我会收到 System IO IOException 句柄无效 有谁知道为什么会出现这种异常 下面你可以看到触发异常的代码 foreach string directoryPath in Di
  • 使用 OSMnx 提取约束多边形

    我正在使用 OSMnx 包来解决以下任务 地图上有一个由纬度和经度定义的点 X 我们需要检测包含该点 X 并受到相邻道路约束的多边形 所以基本上点 X 位于多边形内部 相邻道路将是该多边形的边界 到目前为止 我只设法在地图上绘制图形的可视化
  • 从函数返回后更改 ggplot 对象的点大小

    假设我有一个返回 ggplot 对象的函数 getplot function x rnorm 16 y rnorm 16 dat data frame x y myplot ggplot dat aes x y geom point myp
  • 所有公共结构都会产生对隐式删除的默认构造函数的调用

    我明白什么call to implicitly deleted default constructor意思是但我不明白为什么我会在这里得到它 struct TransformData enum type t kDelay 0 kScale
  • 编写一个可以采用 Int 或 Double 值的 scala 函数

    我编写了一个函数来接受以下类型的值 1 数组 1 0 2 0 3 0 这是一个元组 其中 Int 是第一个值 下一个是双精度数组 我还希望它也接受整数数组 我写的函数如下 def getCountsAndAverages T Paramet
  • C# 使用 NumberLong 将 mongodb bson 转换为 json

    我有一个动态 mongoDB bson 文档 我尝试将其反序列化为 C Dictionary 对象 bson 文档包含 LongNumber 类型 我遇到了麻烦 var json entity BsonValue ToJson JsonCo
  • 如何使用生成器形成多个管道?

    我正在使用 python 并且正在尝试找到一种将多个生成器优雅地链接在一起的方法 问题的一个例子是 例如 有一个根生成器 它提供某种数据 每个值都像级联一样传递给它的 子级 而级联反过来可能会修改它们接收的对象 我可以走这条路 for x