在此处的文档中(https://beam.apache.org/documentation/programming-guide/#additional-outputs https://beam.apache.org/documentation/programming-guide/#additional-outputs)在 4.5.2 有一个pvalue.TaggedOutput()
产生了。
The pvalue
似乎很难导入,我从 apache 文档复制了导入行,然后使用--save_main_session
选项以及save_main_session=True
in the def run()
也pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
在我启动管道之前。所有导入适用于所有函数,所有类适用于所有函数。但不是pvalue
。我还尝试了所有这些可能的组合,并将它们排除在外。pvalue
始终是未知的。
我从这里的食谱中获取了所有代码:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
尽管如此,没有 p 值。
NameError: name 'pvalue' is not defined [while running 'generatedPtransform-1725']
仅当我使用 Dataflowrunner 时才会生成此错误,而不是在使用 Directrunner 时生成。
我的 DoFn 示例
class Splitter(beam.DoFn):
TAG1 = 'kleintje'
TAG2 = 'grootje'
def process(self, element):
splittertid = element.get('id')
if splittertid < 100:
yield pvalue.TaggedOutput(self.TAG1, element)
else:
yield pvalue.TaggedOutput(self.TAG2, element)
我的 run() 示例
def run(argv=None, save_main_session=True):
sources = [
json.loads('{"id":72234,"value":1'),
json.loads('{"id":23,"value":2}')
]
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
| beam.Create(sources)
| beam.ParDo(Splitter()).with_outputs(Splitter.TAG1,Splitter.TAG2,main=Splitter.TAG1)
** 我的进口 **
from __future__ import absolute_import
import argparse
import logging
import re
import json
import datetime
from jsonschema import validate
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json