如何在数据流中正确使用“展平”

2024-05-03

我们的管道如下所示:

GCS(gz 压缩文件)-> ParDo -> BigQuery

我想使用“展平”从 GCS 中提取多个文件作为管道的输入。但它一直因错误而烦恼:

Workflow failed. Causes: (5001e5764f46ac2c): BigQuery creation of import job for table "Impressions_05_2015_denormalized_test" in dataset "CPT_XXXX" in project "gdfp-XXXX" failed. Causes: (5001e5764f46a1cf): Error:
 Message: Load configuration must specify at least one source URI
 HTTP Code: 400

Code:

PCollection<String> file1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_21.gz").withCompressionType(TextIO.CompressionType.GZIP));
        PCollection<String> file2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_22.gz").withCompressionType(TextIO.CompressionType.GZIP));
        PCollectionList<String> allFiles = PCollectionList.of(file1).and(file2);
        PCollection<String> inputRead = allFiles.apply(Flatten.<String>pCollections());
inputRead.apply(ParDo.of(transformation)
                .named(String.format("%s-CPT-transform", type))
                .withSideInputs(views))
                .apply(Write.to(getOutputTable(type))
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(WRITE_APPEND)
                        .withSchema(schema)
                        .named(String.format("%s-BQ-write", type)));

作业 ID 示例:2015-05-12_19_54_06-10158770219525037626

我究竟做错了什么?


我没有采用提议的黑客方法(这确实非常粗糙),而是在finishBundle()方法。这将为每个包写入 1 个空行,但我们可以忍受这一点,直到推出修复程序。设置“id”可以更轻松地稍后过滤掉这些行。

此外,这种解决方法/黑客更容易实现:

@Override
public void finishBundle(Context c) throws Exception {
   TableRow workaroundRow = new TableRow();
   workaroundRow.set("id", "workaround_row");
   c.output(workaroundRow); //Workaround to http://goo.gl/CpBxEf
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在数据流中正确使用“展平” 的相关文章

  • 如何将在执行同一数据流管道期间计算的架构写入 BigQuery?

    我的场景是此处讨论的场景的一种变体 如何使用数据流执行期间计算的架构写入 BigQuery https stackoverflow com questions 29440279 how do i write to bigquery usin
  • Google Cloud Dataflow 中的自动缩放功能未按预期工作

    我正在尝试在我的数据流作业中启用自动缩放 如中所述本文 https cloud google com dataflow service dataflow service desc autoscaling 我通过以下代码设置相关算法来做到这一
  • 处理数据流中一对多阶段的正确方法

    我有一个 Java 批处理管道 它遵循以下模式 FileIO ExtractText gt input 1 file output millions of lines of text ProcessData ProcessData 阶段包含
  • 使用 Python 的 Dataflow/Beam 示例

    我正在尝试获取以下项目的样本PCollection在 Dataflow Beam 上使用 Python SDK 虽然没有记录 Sample FixedSizeGlobally n 存在 测试时 它seems返回一个PCollection包含
  • Apache Beam 每用户会话窗口未合并

    我们有一个有用户的应用程序 每个用户每次使用我们的应用程序大约 10 40 分钟 我想根据发生的特定事件 例如 该用户已转换 该用户上次会话出现问题 该用户上次会话成功 在此之后 我想计算每天这些更高级别的事件 但这是一个单独的问题 为此
  • 无法通过在 Apache Beam 中创建模板来按所需顺序运行多个管道

    我有两个独立的管道 分别为 P1 和 P2 根据我的要求 我只需要在 P1 完全完成执行后才运行 P2 我需要通过一个模板完成整个操作 基本上 模板在找到 run 方式 即 p1 run 时就被创建 所以我可以看到 我需要使用两个不同的模板
  • 如何获取当前滑动窗口的最大时间戳

    我正在使用 X 大小和 Y 周期的滑动时间窗口 为了标记每个窗口的输出 我想获取PCollection当前窗口的时间戳 PCollection
  • Dataflow 作业完成时通知 Google PubSub

    有没有办法在 Google Dataflow 作业完成后将消息发布到 Google Pubsub 上 我们需要通知依赖系统传入数据的处理已完成 将数据写入到接收器后 Dataflow 如何发布 EDIT 我们希望在管道完成写入 GCS 后发
  • Google Cloud Dataflow (Python):读取和写入 .csv 文件的函数?

    我无法弄清楚 GCP Dataflow Python SDK 中读取和写入 csv 文件 或任何非 txt 文件 的精确函数 对于BigQuery 我已经弄清楚了以下功能 beam io Read beam io BigQuerySourc
  • Cloud Dataflow 中的作业失败:启用 Dataflow API

    我目前正在尝试将 Dataflow 与 Pub Sub 结合使用 但收到此错误 工作流程失败 原因 6e74e8516c0638ca 刷新您的凭据时出现问题 请检查 1 为您的项目启用Dataflow API 2 您的项目有一个机器人服务帐
  • 可以使用数据流将 pubsub 消息重复数据删除回 pubsub 吗?

    我有一个将数据写入 Google Cloud pubsub 的应用程序 根据 pubsub 的文档 由于重试机制而导致的重复偶尔可能会发生 还有消息乱序的问题 这在 pubsub 中也得不到保证 另外 根据文档 可以使用 Google Cl
  • 旁加载静态数据

    在 ParDo 中处理数据时 我需要使用存储在 Google Cloud Storage 上的 JSON 架构 我想这可能是侧面加载 我读了他们称之为文档的页面 https beam apache org releases pydoc 2
  • 从 Dataflow 进行流式传输时从 BigQuery 删除数据

    从 Apache Beam 管道加载数据时是否可以从 BigQuery 表中删除数据 我们的用例是这样的 我们需要根据时间戳字段 Dataflow 从 Pubsub 主题提取消息的时间 从表中删除 3 天前的数据 是否建议这样做 如果是 有
  • Apache Beam:DoFn 与 PTransform

    Both DoFn and PTransform是一种定义操作的方法PCollection 我们如何知道何时使用哪个 理解它的一个简单方法是类比map f 对于列表 高阶函数map将函数应用于列表的每个元素 返回结果的新列表 您可以将其称为
  • 将新文件添加到 Cloud Storage 时触发 Dataflow 作业

    我想在将新文件添加到存储桶时触发数据流作业 以便处理新数据并将其添加到 BigQuery 表中 我看到云函数可以被触发 https cloud google com functions calling google cloud storag
  • Apache Beam Pipeline 写表后查询表

    我有一个 Apache Beam Dataflow 管道 它将结果写入 BigQuery 表 然后我想查询该表以获取管道的单独部分 但是 我似乎无法弄清楚如何正确设置此管道依赖性 我编写的新表 然后想要查询 与一个单独的表连接以进行某些过滤
  • 如何在欧洲使用 Cloud Dataflow 区域终端节点?

    是否可以将 Google Cloud Platform Dataflow 作业的区域更改为欧洲 我已将管道区域设置为europe west1 d但我无法更改工作本身的区域 我尝试更改管道选项中的区域 但这会导致错误 并且只有默认区域有效 p
  • 将侧输入应用于 Apache Beam 中的 BigQueryIO.read 操作

    有没有办法将侧面输入应用于 Apache Beam 中的 BigQueryIO read 操作 举例来说 我在 PCollection 中有一个值 我想在查询中使用该值从 BigQuery 表中获取数据 使用侧面输入可以吗 或者在这种情况下
  • 使用 GlobalWindow 在 Beam 中进行状态垃圾收集

    Apache Beam 最近推出了状态细胞 https beam apache org blog 2017 02 13 stateful processing html 通过StateSpec和 StateId注释 在 Apache Fli
  • 如何删除 gcloud Dataflow 作业?

    数据流作业在我的仪表板上杂乱无章 我想从我的项目中删除失败的作业 但在仪表板中 我没有看到任何删除数据流作业的选项 我正在寻找至少像下面这样的东西 gcloud beta dataflow jobs delete JOB ID 要删除所有作

随机推荐

  • 以 ASCII 字符串形式获取 MemoryStream 内容的快速方法

    我在 MemoryStream 中有一个 JSON 字符串 我使用以下代码将其作为 ASCII 字符串获取 MemoryStream memstream new MemoryStream Write a JSON string to mem
  • 更改 IIS 主目录路径会触发重新启动吗?

    在 IIS 特别是 6 0 中 在主目录选项卡下 如果我更改本地路径 是否会导致 IIS 重新启动或应用程序池回收 相关的 是否有一个参考概述了 IIS 元数据库的哪些更改将触发重新启动或应用程序池回收 我还没找到这个 更改主目录路径中网站
  • 为什么 Swift UITableViewController 模板在 tableView cellForRowAtIndexPath 方法中使用可选参数?

    如果您创建新的 UITableViewController 类 您将看到重写的注释方法 override func tableView tableView UITableView cellForRowAtIndexPath indexPat
  • EWS 消息跟踪报告

    我一直在研究如何使用 EWS 从交换中获取消息跟踪报告 但似乎无法查明任何内容 我打算构建一个抓取日志文件的应用程序 但如果我可以通过 EWS 来完成它 那对我正在做的事情会更好 有任何想法吗 我终于能够为我的问题创建一个解决方案 我在 C
  • 跨线反映点的算法

    给定一个点 x1 y1 和一条直线的方程 y mx c 我需要一些伪代码来确定反映直线上第一个点的点 x2 y2 花了大约一个小时试图弄清楚但没有运气 请参阅此处的可视化 http www analyzemath com Geometry
  • Emacs:调试Python的方法

    我把这个贴在程序员 stackexchange com https softwareengineering stackexchange com questions 29844 emacs methods for debugging pyth
  • NancyFX:如何检查查询字符串/表单值是否已正确传递给我的处理程序?

    Nancy 通过以下方式将我的查询字符串和表单值传递给我的处理程序dynamic多变的 下面的示例显示了通过 Nancy 请求传递到 POST 处理程序的表单值 例如Request Form xxx Handler Post gt var
  • 我什么时候应该将可选值与 nil 进行比较?

    很多时候 您需要编写如下代码 if someOptional nil do something with the unwrapped someOptional e g someFunction someOptional 这似乎有点冗长 而且
  • 带条件的 Array.join()

    我该如何使用Array join 有条件的函数 例如 var name aa bb var s name join 输出是 aa bb 我想添加一个条件 仅显示不为空的单词 aa bb 您可以使用Array filter https dev
  • ImmutableJS:合并两个对象列表,而不重复它们

    假设我有以下内容 var allFoods Immutable List var frenchFood Immutable List type french fries price 3 type petit gateau price 40
  • collectstatic 无法收集管理静态文件

    当我运行collectstatic并且管理页面没有CSS时 我遇到以下错误 Error IOError Errno 2 没有这样的文件或目录 u usr local lib python2 7 dist packages django co
  • Angular UI-Router 的“ui-sref”未按预期工作

    如果我使用 href UI Router 将按预期工作 但是 如果我使用 ui sref 它不会按预期工作 我对以下示例有两个问题 锚点 link series 2 no param 和 link series 2 one param 不会
  • 如何使用 Powershell 查找组成员身份以包括嵌套组的成员并包括父组的名称

    我正在尝试创建一个组中所有用户的 CSV 以包含嵌套组的成员 并映射每个成员所在的组 我发现以下 powershell 命令几乎可以完成此任务 但我还需要知道嵌套组的每个成员的嵌套组的名称 MembersALL MembersRecursi
  • Android:RunOnUiThread 与 AsyncTask

    我相信 Google 建议开发人员使用 AsyncTask 但是 我想知道它与使用 new Thread 然后调用 RunOnUiThread 在性能和内存效率方面有何不同 使用 RunOnUithread 的示例 some code 1
  • 如何将 CSS 样式应用于四开输出

    我想将样式应用于四开块输出 我做的第一件事就是在类中嵌入一些 CSS 属性 output在四开文档中 然后使用以下内容引用它 r class output output 它有效 但我认为它不是很有效 因为我必须在每个文档中编写它 所以我写了
  • 基于邻近度的负载均衡

    我正在开展一个项目 我们在世界各地拥有大量 目前为 5 台 服务器 客户端通过集中式代理连接到其中一台服务器 我们知道客户的原籍国 但除此之外一无所知 我们完全控制服务器 因此我们可以获得所需的所有信息 我们不控制客户 他们必须按照标准通过
  • spring 调度一个具有固定延迟和初始延迟的作业

    我正在尝试安排方法调用 我想在服务器启动后立即安排此方法调用 然后每 30 秒调用一次 下面的代码 Configuration EnableScheduling EnableTransactionManagement public clas
  • 如何使用IIS压缩所有文件

    IIS compression has been enabled 以下是web config的http压缩标签
  • Symfony 2 登录后重定向

    在 Symfony 2 中 您可以设置注销目标 以便注销后您将被重定向到 main 但是 登录后您将被重定向到 有没有办法为 成功 登录设置目标 firewalls dev pattern profiler wdt css images j
  • 如何在数据流中正确使用“展平”

    我们的管道如下所示 GCS gz 压缩文件 gt ParDo gt BigQuery 我想使用 展平 从 GCS 中提取多个文件作为管道的输入 但它一直因错误而烦恼 Workflow failed Causes 5001e5764f46ac