由于该文档仅适用于 JAVA,我无法真正理解它的含义。
它指出 -“虽然 ParDo 始终生成一个主输出 PCollection(作为 apply 的返回值),但您也可以让 ParDo 生成任意数量的附加输出 PCollection。如果您选择有多个输出,您的 ParDo 将返回所有输出 PCollection(包括主输出)捆绑在一起。例如,在 Java 中,输出 PCollections 捆绑在类型安全的 PCollectionTuple 中。
我理解捆绑在一起的含义,但是如果我在 DoFn 中生成一个标签,它是否会生成一个所有其他输出为空的捆绑包,并在代码中遇到它们时生成其他输出?或者它等待所有的产量准备好输入,然后将它们全部打包在一起输出?
文档中对此没有太多说明。虽然我认为它不会等待,只是在遇到时屈服,但我仍然需要了解发生了什么。
回答这个问题的最好方法是举一个例子。这个例子是可用于光束 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py.
假设您要运行字数统计管道(例如,计算每个单词在文档中出现的次数)。为此,您需要将文件中的行分割成单独的单词。考虑到您还想单独计算单词长度。你的分割变换会像这样:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input) # Read in the file
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
split_lines_result = (lines
| beam.ParDo(SplitLinesToWordsFn()).with_outputs(
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
main='words'))
short_words = split_lines_result['words']
character_count = split_lines_result[
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]
在这种情况下,每个都是不同的PCollection
,具有正确的元素。这DoFn
将负责分割其输出,并通过标记元素来实现。看:
class SplitLinesToWordsFn(beam.DoFn):
OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
def process(self, element):
# yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
# collection.
yield pvalue.TaggedOutput(
self.OUTPUT_TAG_CHARACTER_COUNT, len(element))
words = re.findall(r'[A-Za-z\']+', element)
for word in words:
# yield word to add it to the main collection.
yield word
正如您所看到的,对于主输出,您不需要标记元素,但对于其他输出则需要标记元素。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)