如何在 python apache beam 中展平多个 Pcollection

2024-04-21

应该如何实现位于以下位置的以下逻辑:https://beam.apache.org/documentation/pipelines/design-your-pipeline/ https://beam.apache.org/documentation/pipelines/design-your-pipeline/:

//merge the two PCollections with Flatten//me 
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
    .apply(Flatten.<String>pCollections());

// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);

由此可以将多个 PCollection 组合成一个 PCollection 在 apache beam python api 中?


您可以使用Flatten https://beam.apache.org/documentation/programming-guide/#core-beam-transforms也变身。例如:

data1 = ['one', 'two', 'three']
data2 = ['four','five']

input1 = p | 'Create PCollection1' >> beam.Create(data1)
input2 = p | 'Create PCollection2' >> beam.Create(data2)

merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())

合并的 PCollection 将包含:

INFO:root:one
INFO:root:two
INFO:root:three
INFO:root:four
INFO:root:five

完整代码:

import argparse, logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class LogFn(beam.DoFn):
  """Prints information"""
  def process(self, element):
    logging.info(element)
    return element


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  p = beam.Pipeline(options=pipeline_options)

  data1 = ['one', 'two', 'three']
  data2 = ['four','five']

  input1 = p | 'Create PCollection1' >> beam.Create(data1)
  input2 = p | 'Create PCollection2' >> beam.Create(data2)

  merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())

  merged | 'Check Results' >> beam.ParDo(LogFn())

  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 python apache beam 中展平多个 Pcollection 的相关文章

  • 如何查看Databricks中的所有数据库和表

    我想列出 Azure Databricks 中每个数据库中的所有表 所以我希望输出看起来像这样 Database Table name Database1 Table 1 Database1 Table 2 Database1 Table
  • Python 的键盘中断不会中止 Rust 函数 (PyO3)

    我有一个使用 PyO3 用 Rust 编写的 Python 库 它涉及一些昂贵的计算 单个函数调用最多需要 10 分钟 从 Python 调用时如何中止执行 Ctrl C 好像只有执行结束后才会处理 所以本质上没什么用 最小可重现示例 Ca
  • 将数据从 python pandas 数据框导出或写入 MS Access 表

    我正在尝试将数据从 python pandas 数据框导出到现有的 MS Access 表 我想用已更新的数据替换 MS Access 表 在 python 中 我尝试使用 pandas to sql 但收到错误消息 我觉得很奇怪 使用 p
  • 将 Matplotlib 误差线放置在不位于条形中心的位置

    我正在 Matplotlib 中生成带有错误栏的堆积条形图 不幸的是 某些层相对较小且数据多样 因此多个层的错误条可能重叠 从而使它们难以或无法读取 Example 有没有办法设置每个误差条的位置 即沿 x 轴移动它 以便重叠的线显示在彼此
  • 使 django 服务器可以在 LAN 中访问

    我已经安装了Django服务器 可以如下访问 http localhost 8000 get sms http 127 0 0 1 8000 get sms 假设我的IP是x x x x 当我这样做时 从同一网络下的另一台电脑 my ip
  • 如何在flask中使用g.user全局

    据我了解 Flask 中的 g 变量 它应该为我提供一个全局位置来存储数据 例如登录后保存当前用户 它是否正确 我希望我的导航在登录后在整个网站上显示我的用户名 我的观点包含 from Flask import g among other
  • Flask 会话变量

    我正在用 Flask 编写一个小型网络应用程序 当两个用户 在同一网络下 尝试使用应用程序时 我遇到会话变量问题 这是代码 import os from flask import Flask request render template
  • 如何使用Conda下载python包并随后离线安装?

    我知道通过 pip 我可以使用以下命令下载 Python 包 但 pip install 破坏了我的内部包依赖关系 当我做 pip download
  • 如何在 Python 中检索 for 循环中的剩余项目?

    我有一个简单的 for 循环迭代项目列表 在某些时候 我知道它会破裂 我该如何退回剩余的物品 for i in a b c d e f g try some func i except return remaining items if s
  • 根据列值突出显示数据框中的行?

    假设我有这样的数据框 col1 col2 col3 col4 0 A A 1 pass 2 1 A A 2 pass 4 2 A A 1 fail 4 3 A A 1 fail 5 4 A A 1 pass 3 5 A A 2 fail 2
  • 测试 python Counter 是否包含在另一个 Counter 中

    如何测试是否是pythonCounter https docs python org 2 library collections html collections Counter is 包含在另一个中使用以下定义 柜台a包含在计数器中b当且
  • OpenCV 无法从 MacBook Pro iSight 捕获

    几天后 我无法再从 opencv 应用程序内部打开我的 iSight 相机 cap cv2 VideoCapture 0 返回 并且cap isOpened 回报true 然而 cap grab 刚刚返回false 有任何想法吗 示例代码
  • AWS EMR Spark Python 日志记录

    我正在 AWS EMR 上运行一个非常简单的 Spark 作业 但似乎无法从我的脚本中获取任何日志输出 我尝试过打印到 stderr from pyspark import SparkContext import sys if name m
  • 绘制方程

    我正在尝试创建一个函数 它将绘制我告诉它的任何公式 import numpy as np import matplotlib pyplot as plt def graph formula x range x np array x rang
  • 添加不同形状的 numpy 数组

    我想添加两个不同形状的 numpy 数组 但不进行广播 而是将 缺失 值视为零 可能最简单的例子是 1 2 3 2 gt 3 2 3 or 1 2 3 2 1 gt 3 2 3 1 0 0 我事先不知道形状 我正在弄乱每个 np shape
  • Flask如何获取请求的HTTP_ORIGIN

    我想用我自己设置的 Access Control Allow Origin 标头做出响应 而弄清楚请求中的 HTTP ORIGIN 参数在哪里似乎很混乱 我在用着烧瓶 0 10 1 以及HTTP ORIGIN似乎是这个的特点之一object
  • 为字典中的一个键附加多个值[重复]

    这个问题在这里已经有答案了 我是 python 新手 我有每年的年份和值列表 我想要做的是检查字典中是否已存在该年份 如果存在 则将该值附加到特定键的值列表中 例如 我有一个年份列表 并且每年都有一个值 2010 2 2009 4 1989
  • Scrapy:如何使用元在方法之间传递项目

    我是 scrapy 和 python 的新手 我试图将 parse quotes 中的项目 item author 传递给下一个解析方法 parse bio 我尝试了 request meta 和 response meta 方法 如 sc
  • 从列表指向字典变量

    假设你有一个清单 a 3 4 1 我想用这些信息来指向字典 b 3 4 1 现在 我需要的是一个常规 看到该值后 在 b 的位置内读写一个值 我不喜欢复制变量 我想直接改变变量b的内容 假设b是一个嵌套字典 你可以这样做 reduce di
  • NotImplementedError:无法将符号张量 (lstm_2/strided_slice:0) 转换为 numpy 数组。时间

    张量流版本 2 3 1 numpy 版本 1 20 在代码下面 define model model Sequential model add LSTM 50 activation relu input shape n steps n fe

随机推荐

  • 如何使用同一模型的其他字段的值在 django 模型中创建字段?

    我想创建一个字段名称 total其中有所有产品的总价 数量 我想要的是我该怎么做total price quantity在 Django 模型中 正如您所看到的 可以有不止一种产品 我已通过 tabularinline 将 OderItem
  • Selenium Python 等待元素中出现文本错误显示需要 3 个参数 2 个给定

    我正在使用 WebdriverWait 等待网页上的元素中出现某些文本 我正在使用 Selenium 和 Python 我的语法不正确 我收到错误 类型错误 init 恰好需要 3 个参数 给定 2 个 错误跟踪 Traceback mos
  • 使用单个字符串查找多个文件路径

    我尝试编写一个批处理脚本来查找与输入字符串同名的文件的所有路径 现在它只能找到找到的第一个文件 我想不出一种方法让它列出多个文件位置 我经验不足 需要一些帮助 这是脚本代码的一部分 start cls echo Enter file nam
  • SQL-在一个字段中选择与另一字段中记录最高的不同记录

    在我有一个像这样的表的情况下 int id PK int staff id int skill id bit mainskill 我想为每位员工 由 Staff id 表示 仅选择一条记录 列出他们的主要技能 由 mainskill 中的
  • Visual Studio Code 中的 PHP 块快捷方式

    如何在 Visual Studio Code 中打开基本 PHP 块 如下所示 In Sublime Text https en wikipedia org wiki Sublime Text I simply type php and p
  • 时间格式说明(Google Directions API)

    我已阅读用于提出方向请求的 Google Directions API 文档 URL 的示例如下 http maps googleapis com maps api directions json origin Brooklyn desti
  • 在 Laravel 中使用 Socialite 登录后重定向到 URL

    我需要使用以下 URL 注册参加锦标赛 http laravel dev tournaments 1 register 该 URL 位于中间件 auth 中 因此如果用户未登录 他将被重定向到登录页面 我需要的是重定向到 http lara
  • 循环遍历多个 JObject 级别并将信息收集为字符串

    我使用以下代码从 URL 收集 Json 数据 var json new WebClient DownloadString http steamcommunity com id tryhardhusky inventory json 753
  • 使用 gdb 调试时彻底退出 valgrind

    我正在使用 valgrind 和 gdb 调试程序 然而 我以一种野蛮的方式终止了这些调试会话 这真的是它应该做的吗 设置调试会话 按照来自的指示valgrind 官方网站 http valgrind org docs manual man
  • 如何显示文件解压进度?

    我正在尝试找出一种方法来显示当前进度以及解压缩并将 zip 文件的内容写入磁盘的剩余时间 我目前正在使用此处找到的 ZipArchiver 类http code google com p ziparchive http code googl
  • Python pandas 插入长整型

    我正在尝试在 Pandas Dataframe 中插入长整数 import numpy as np from pandas import DataFrame data scores 6311132704823138710 273 26850
  • NEDB 文件存储在哪里?

    var Datastore require nedb db new Datastore filename testdb db autoload true var doc hello world n 5 today new Date nedb
  • 在 Google 电子表格上,如何称呼 IP 的城市、国家/地区?

    我想知道是否有一个公式 脚本可以在 Google 电子表格上使用来获取 IP 地址数组的城市 位置 也就是说 假设 A 列上的每个单元格都有 100 个 IP 地址 我应该在 B 列上使用什么公式 脚本来获取各自的城市和位置 最简单的方法是
  • Qt 调试器在 mac 上使用错误的 python 版本

    我使用的是 macOS Mojave 10 14 6 我的Qt版本是5 13 1 我的 Qt Creator 版本是 4 10 0 当我设置断点并运行应用程序时 调试器永远不会完成并打印到调试器日志并显示以下错误 因此 据我所知 lldb
  • iOS 复制和粘贴

    我正在创建一个应用程序 以便在我在 iOS 设备上复制某些内容时保存我复制的项目 无论如何 我是否可以创建一个事件 以便每当我从 iOS 设备上的任何应用程序复制某些内容时 它都会将其保存到我的应用程序中 我希望它在我复制文本时触发 以便将
  • 是否可以使用前导和跟踪来设置 Android 字体样式?

    android 字体样式中是否可以有以下内容 Leading http en wikipedia org wiki Leading 文本行之间的垂直空间 名称来自于机械印刷过程中用于分隔文本行的物理铅片 Tracking http en w
  • 什么是 Unicode、UTF-8 和 UTF-16?

    Unicode 的基础是什么 为什么需要 UTF 8 或 UTF 16 我在谷歌上研究过这个问题 也在这里搜索过 但我不清楚 In VSS https en wikipedia org wiki Microsoft Visual Sourc
  • cuda 共享内存 - 结果不一致

    我正在尝试并行缩减以对 CUDA 中的数组求和 目前我传递一个数组来存储每个块中元素的总和 这是我的代码 include
  • Log4J 仅将一个类附加到附加程序

    我需要定期轮询正在运行的应用程序的 JVM 内存统计信息 我正在运行一个服务来执行此操作并将统计信息写入根记录器 我对根记录器的使用与否没有太多控制权 我想要做的是将这些日志消息路由到单个附加程序 该附加程序应该只处理来自该类的日志消息 而
  • 如何在 python apache beam 中展平多个 Pcollection

    应该如何实现位于以下位置的以下逻辑 https beam apache org documentation pipelines design your pipeline https beam apache org documentation