NameError:名称“pvalue”未定义

2024-02-19

在此处的文档中(https://beam.apache.org/documentation/programming-guide/#additional-outputs https://beam.apache.org/documentation/programming-guide/#additional-outputs)在 4.5.2 有一个pvalue.TaggedOutput()产生了。

The pvalue似乎很难导入,我从 apache 文档复制了导入行,然后使用--save_main_session选项以及save_main_session=True in the def run()pipeline_options.view_as(SetupOptions).save_main_session = save_main_session在我启动管道之前。所有导入适用于所有函数,所有类适用于所有函数。但不是pvalue。我还尝试了所有这些可能的组合,并将它们排除在外。pvalue始终是未知的。

我从这里的食谱中获取了所有代码:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py

尽管如此,没有 p 值。
NameError: name 'pvalue' is not defined [while running 'generatedPtransform-1725']

仅当我使用 Dataflowrunner 时才会生成此错误,而不是在使用 Directrunner 时生成。

我的 DoFn 示例

class Splitter(beam.DoFn):

    TAG1 = 'kleintje'
    TAG2 = 'grootje'

    def process(self, element):
        splittertid = element.get('id')

        if splittertid < 100:
            yield pvalue.TaggedOutput(self.TAG1, element)
        else:
            yield pvalue.TaggedOutput(self.TAG2, element)

我的 run() 示例

def run(argv=None, save_main_session=True):
    sources = [
        json.loads('{"id":72234,"value":1'),
        json.loads('{"id":23,"value":2}')
        ]

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:
           | beam.Create(sources)
           | beam.ParDo(Splitter()).with_outputs(Splitter.TAG1,Splitter.TAG2,main=Splitter.TAG1)

** 我的进口 **

from __future__ import absolute_import

import argparse
import logging
import re
import json
import datetime
from jsonschema import validate

import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

你应该尝试导入pvalue在您的类 Splitter 中,因为使用 Apache Beam 时,应在类和函数中声明依赖项。

你的代码应该是这样的

class Splitter(beam.DoFn):
    from apache_beam import pvalue
    TAG1 = 'kleintje'
    TAG2 = 'grootje'

    def process(self, element):
        splittertid = element.get('id')

        if splittertid < 100:
            yield pvalue.TaggedOutput(self.TAG1, element)
        else:
            yield pvalue.TaggedOutput(self.TAG2, element)

您可以使用from apache_beam import pvalue通常使用 Directrunner,因为代码在本地运行;然而,当使用 Dataflowrunner 时,代码应该遵循一个结构来正确处理依赖关系 https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

NameError:名称“pvalue”未定义 的相关文章

随机推荐

  • git 合并不同的存储库?

    我所有的项目都使用 SVN 有时项目 B 是项目 A 的副本 当项目 A 有一般性变更时 我可以使用svn merge A在目录 B 中 它将合并这些更改 现在 如果我想使用 git 我不喜欢将所有项目放在同一个存储库中 因为这样我就必须克
  • WPF:找不到 Microsoft_Windows_Themes

    我在 WPF 应用程序中收到此错误 找不到类型 Microsoft Windows Themes ScrollChrome 验证您没有缺少程序集引用并且所有引用的程序集均已构建 任何想法 确保将此引用添加到控件 页面 资源字典 其他内容的最
  • 读取外部网站提交的 Angular 中的 POST 表单

    我正在开发一个网站 后端使用 Java 前端使用 Angular 有一种情况 一些外部网站可能会使用POST形式向我的网站发送数据 例如 General请求网址 https myangularwebsite 请求方式 POST 请求标头内容
  • 我可以在四元数中切换 X Y Z 吗?

    我有一个 Y 轴向上的坐标系 我需要将其转换为 Z 向上的坐标系 我将旋转存储在四元数中 所以我的问题是 如果我有一个四元数 X Y Z 我可以用 Z 切换 Y 并得到 Z 实际上是 UP 的结果吗 只是交换四元数中的两个轴 不 这不起作用
  • Azure Bicep - 有条件地创建一个秘密

    我正在使用 Bicep 创建一个 KeyVault 并且我想在保管库中创建一个秘密 但前提是还没有给定名称的秘密 检查 KeyVault 是否存在不起作用 因此我现在正在检查特定标签是否存在 创建 Vault 时 我在资源组中写入一个标签
  • 如何查看.RData 文件中的数据?

    我必须加载 isfar RData 文件才能在其他计算中使用它 此处描述并不重要 我想简单地看看 isfar RData 文件中的数据如何 例如它携带什么数字 列 行 首先我加载我的文件 isfar lt load C Users isfa
  • 打印 NSMutableURLRequest 内容

    我想问是否有人尝试过打印 NSMutableURLRequest request 的值 这是我的场景 我已经形成了我的 XML 并尝试使用 Firefox Poster 插件发送它 我成功地处理了有效和无效的内容 所以是时候进入 iOS 了
  • 全面的初学者 virtualenv 教程? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我最近听到有关 virtualenv 的传闻 我很感兴趣 但我所听到的只是一些赞扬 并不清楚它是什么或如何使用它 我正在寻找 理想情况下
  • java中如何调用一个线程在特定时间运行?

    我想让线程在特定的确切时间执行 例如 2012 07 11 13 12 24 和 2012 07 11 15 23 45 我检查了ScheduledExecutorService 但它只支持在第一次运行后的特定时间段后执行 而且我没有任何固
  • UITableView 在动画过程中崩溃,已找到解决方案,但没有找到根本原因,想知道为什么?

    在我的iphone项目中 我总是将UITableView作为IBOutlet插入到视图控制器中 大多数时候它运行良好 但是当popToRootViewControllerAnimated调用动画时会发生随机崩溃 通过僵尸跟踪 发现崩溃是由于
  • 如何构建 JSON 对象?

    目前我通过执行以下操作构建 JSON 对象 users User all users each do user userlist lt lt id gt user id fname gt user fname lname gt user l
  • 使用 C# 和 WIN32 滚动记事本

    我正在尝试使用 C 应用程序滚动记事本窗口 相关的代码块如下 移动 调整窗口大小的调用有效 所以我知道句柄是有效的 请你看看我错过了什么 运行时什么也没有发生 Flags public enum SetWindowPosFlags uint
  • 如何在启动新窗口时关闭当前窗口(在代码中)

    SignInWindow signIn new SignInWindow signIn ShowDialog 上面的代码位于我的 MainWindow 类中 当显示新窗口时 我希望关闭当前窗口 最好的方法是什么 我的应用程序是 C WPF
  • 使用 twitter bootstrap 创建工具提示/弹出窗口后的回调函数?

    我想在使用 twitter bootstrap 创建工具提示或弹出窗口后对其进行操作 据我所知 没有构建方法可以做到这一点 selector popover placement bottom 例如 假设我想将创建的工具提示从计算的位置向上移
  • 计算机视觉的统计帮助[关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在计算机视觉领域做我的毕业项目 我只修了一门讨论非常基本概念的统计学课程 现在我在相当高级的主题上面
  • Android Kotlin 中的 Moshi - ENUM 作为 MutableMap 键在反序列化时转换为 String

    我有一个MutableMap
  • 如何使用接口使用 Kryo 序列化/反序列化对象

    是否可以通过注册接口而不是具体类来使用 Kryo 反序列化 反序列化对象 具体来说 我需要反序列化 Java 7Path对象在接口中定义 我尝试编写一个序列化程序 将路径 URI 保存为字符串 并在读取反序列化期间重新创建它 但事实证明 K
  • Angular 路由器事件名称

    Angular 2 路由器有一个events https angular io docs ts latest api router index Router class html events anchor作为 rxjs 主题的属性 订阅后
  • 何时在 NHibernate / Hibernate OneToMany 关系上使用 inverse=false ?

    我一直在尝试掌握 Hibernate 的逆属性 它似乎只是概念上困难的事情之一 我得到的要点是 当您有一个父实体 例如 Parent 该实体具有使用一对多映射 在映射上设置 inverse true 告诉 Hibernate 另一端 子进程
  • NameError:名称“pvalue”未定义

    在此处的文档中 https beam apache org documentation programming guide additional outputs https beam apache org documentation pro