将 MutationGroup 流式传输到 Spanner 中

2024-02-03

我正在尝试使用 SpannerIO 将 MutationGroups 流式传输到扳手中。 目标是每 10 秒写入新的 MuationGroup,因为我们将使用 Spanner 来查询近期 KPI。

当我不使用任何 Windows 时,出现以下错误:

Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
    at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:120)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1585)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1470)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:868)
    at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:823)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
    at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:52)
    at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:20)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:388)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:372)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline.main(EntityBuilderPipeline.java:122)
:entityBuilder FAILED

由于上述错误,我假设输入集合需要窗口化并触发,因为 SpannerIO 使用 GroupByKey (这也是我的用例所需要的):

        ...
        .apply("1-minute windows", Window.<MutationGroup>into(FixedWindows.of(Duration.standardMinutes(1)))
            .triggering(Repeatedly.forever(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(10))
            ).orFinally(AfterWatermark.pastEndOfWindow()))
            .discardingFiredPanes()
            .withAllowedLateness(Duration.ZERO))
        .apply(SpannerIO.write()
                    .withProjectId(entityConfig.getSpannerProject())
                    .withInstanceId(entityConfig.getSpannerInstance())
                    .withDatabaseId(entityConfig.getSpannerDb())
                    .grouped());

当我这样做时,我在运行时遇到以下异常:

java.lang.IllegalArgumentException: Attempted to get side input window for GlobalWindow from non-global WindowFn
        org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:631)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:683)
        com.google.cloud.dataflow.worker.StreamingSideInputFetcher.storeIfBlocked(StreamingSideInputFetcher.java:182)
        com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:71)
        com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
        org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
        org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
        org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:145)

经过进一步调查,似乎是由于.apply(Wait.on(input))在 SpannerIO 中:它有一个全局侧输入,似乎不适用于我的固定窗口,如文档所示Wait.java state:

If signal is globally windowed, main input must also be. This typically would be useful
 *       only in a batch pipeline, because the global window of an infinite PCollection never
 *       closes, so the wait signal will never be ready.

作为临时解决方法,我尝试了以下方法:

  • 添加带有触发器的全局窗口而不是固定窗口:

        .apply("globalwindow", Window.<MutationGroup>into(new GlobalWindows())
                .triggering(Repeatedly.forever(AfterProcessingTime
                        .pastFirstElementInPane()
                        .plusDelayOf(Duration.standardSeconds(10))
                ).orFinally(AfterWatermark.pastEndOfWindow()))
                .discardingFiredPanes()
                .withAllowedLateness(Duration.ZERO))
    

    这会导致仅当我耗尽管道时才写入扳手。我的印象是Wait.on()信号仅在全局窗口关闭时触发,并且不适用于触发器。

  • 禁用.apply(Wait.on(input))在 SpannerIO 中:

    这会导致管道卡在视图创建上 在此 SO 帖子中进行了描述:SpannerIO Dataflow 2.3.0 卡在 CreateDataflowView 中 https://stackoverflow.com/questions/49273528/spannerio-dataflow-2-3-0-stuck-in-createdataflowview.

    当我检查工作日志以获取线索时,我确实收到以下警告:

    logger:  "org.apache.beam.sdk.coders.SerializableCoder"
    message:  "Can't verify serialized elements of type SpannerSchema have well defined equals method. This may produce incorrect results on some PipelineRunner
    logger:  "org.apache.beam.sdk.coders.SerializableCoder"   
    message:  "Can't verify serialized elements of type BoundedSource have well defined equals method. This may produce incorrect results on some PipelineRunner"
    

请注意,一切都适用于 DirectRunner,并且我正在尝试使用 DataflowRunner。

对于我可以尝试让它运行的事情,有人有任何其他建议吗?我很难想象我是唯一一个尝试将 MutationGroups 流式传输到扳手中的人。

提前致谢!


目前,Beam Streaming 不支持 SpannerIO 连接器。请关注这个请求请求 https://github.com/apache/beam/pull/6478它增加了对扳手 IO 连接器的流支持。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 MutationGroup 流式传输到 Spanner 中 的相关文章

  • 初始化Object中的空字符串?

    有人使用以下方法来初始化 NSstring NSString astring NSString alloc init 我想知道为什么不直接使用 NSString atring nil or NSString astring 没有semant
  • Aurelia - click.delegate 错误 - 说该函数不是函数

    Novice 我有一个视图模型 它有一个简单地切换值的函数 import autoinject from aurelia framework import bindable from aurelia templating import Lo
  • Mercurial:移植、记录、qrecord、搁置、移植、dirstate、队列

    我是 Mercurial 的新手 并且仍处于评估过程中 因此这四个概念对我来说有点令人困惑 有些被认为相当于 Git 的 Staging Index 概念 有些甚至比 Git 的 Staging 更好 四个命令怎么做hg graft hg
  • Android:列“_id”不存在

    我收到这个错误 IllegalArgumentException 列 id 不存在 当使用SimpleCursorAdapter从我的数据库中检索 该表确实有这个 id柱子 注意到这是一个常见问题 我尝试根据网上的一些解决方案来解决它 但它
  • 如何使用 ROR 设置控制器页面内的文本区域属性[重复]

    这个问题在这里已经有答案了 任何人都可以解决我的小问题吗 实际上我想在页面加载并检查某些条件后设置文本区域启用 让我来解释一下我的故事实际上是我想要的 Story 我有一个博客页面 当用户登录并进入博客页面时 应该启用文本区域来发表评论 如
  • 如何找到 NuGet 包安装的可执行文件?

    我的项目需要打包为zip文件进行部署 我想在构建后步骤中创建此 zip 文件 为了实现这一点 我通过 NuGet 安装了 7 Zip 命令行包 该包提供了一个可执行文件 我想在构建后步骤中调用它 我知道我可以通过提供已安装软件包的路径来调用
  • 错误:运算符不存在:整数 = 字符变化,使用 Postgres 8.2

    我有一个用旧版本的 Eclipse Ganymede 如果我没记错的话 开发的 Java EE Web 应用程序 我最近迁移到 Kubuntu 12 04 LTS 并将应用程序迁移到 Eclipse Kepler 我从 Eclipse 网站
  • android-如何在谷歌地图上将标记的位置显示为地址

    我已经尝试过 commonsware googlemapsv2 教程 特别是在地图上拖动标记 但现在另一个问题困扰着我 问题是如何将标记的当前位置显示为地图下方或上方的地址 字符串 这是我使用的代码 public class MainAct
  • 当我的应用程序运行时,为什么我会在 Android Studio Logcat 中看到“无可调试应用程序”?

    我想过滤 Android Studio Logcat 中显示的内容 以显示仅与我的应用程序相关的内容 我已经选择Show only selected applications从 logcat 顶部的下拉列表中 如以下屏幕截图所示 但是在它上
  • Scala:如何获取数据框中的行范围

    我有一个DataFrame通过运行创建sqlContext readParquet 文件的一个 The DataFrame由 300 M 行组成 我需要使用这些行作为另一个函数的输入 但我想以较小的批次进行操作 以防止 OOM 错误 目前
  • 将redis数据移至MySQL的更快方法

    我们拥有庞大的购物和产品交易系统 我们在 MySQL 方面遇到了很多问题 因此经过几次研发后 我们计划使用 Redis 并开始将 Redis 集成到我们的系统中 继之前直接访问数据库之后 现在我们已经移动了Redis系统 用户购物车详情 关
  • Android 中带有无尽列表视图滚动的 AsyncTask

    我正在创建一个应用程序 其中我需要有无限的滚动列表视图 我不想在我的应用程序中使用任何库 我在网上看到了一些有助于实现此类列表视图的示例 但我的疑问是 当我的数据来自服务器并在异步任务中进行解析时 如何才能拥有无尽的列表视图 如何从滚动异步
  • 为什么用字符串和时间增量转置 DataFrame 会转换数据类型?

    这种行为对我来说似乎很奇怪 id列 字符串 在转置后转换为时间戳df如果另一列是时间增量 import pandas as pd df pd DataFrame id 00115 01222 32333 val 12 14 170 df v
  • 在启动屏幕中执行代码已更新

    在原始启动屏幕中执行代码 https stackoverflow com questions 27642016 execute code in launch screen 现在默认的LaunchScreenXcode 项目中的文件已更改为
  • 如何在reactJS中将一个页面重定向到另一个页面?

    App js 这是按钮点击事件处理 this handleClick this handleClick bind this handleClick e debugger e preventDefault this context route
  • 生成签名和加密的 JWT

    我正在尝试使用生成签名和加密的 JWT 令牌雨云智威汤逊 http connect2id com products nimbus jose jwt private void generateToken throws JOSEExceptio
  • Nodejs Express css和js未加载

    我在nodejs上使用express 我加载的 html 文件没有获取 java 脚本和 css 文件 索引 html
  • 从 pandas 数据框中绘制堆积条形图

    我有数据框 payout df head 10 复制以下 Excel 绘图的最简单 最智能和最快的方法是什么 我尝试过不同的方法 但无法让一切都到位 Thanks 如果您只想要一个堆积条形图 那么一种方法是使用循环来绘制数据框中的每一列 并
  • Google Apps 脚本 - 脚本之间的通信

    Hi我的问题如下 我想创建一个小网页 在该网页上可以在 Google 日历中创建事件 但有一些限制 就我而言 我的室友可以编辑此日历来预订洗衣机 该预订不能重叠 而且我们所有人的使用天数都是有限的 我创造了私人日历 我已经创建了验证请求的脚
  • 独立值不会出现在 moxy - jaxb 生成的 xml 中

    我有一个使用 jaxb 的 moxy 实现生成的 xml 文件 但是独立值不会出现在 xml 文档中 Note 我是EclipseLink JAXB MOXy http www eclipse org eclipselink moxy ph

随机推荐

  • 固定列标题宽度与正文列宽度不匹配

    标题与列宽不对齐 JsFiddle http jsfiddle net DyMSb 1 截屏 http s17 postimg org dybznay9b screen png 我在用着 ajax aspnetcdn com ajax jq
  • 以单下划线或双下划线开头的函数和变量

    我在各种编程语言 PHP 和 Python 中看到过以下划线开头的函数和变量 并且对其背后的含义感到困惑 假设 PHP 中使用正常约定 单下划线表示受保护的成员变量或方法 双下划线表示私有成员变量或方法 这源于当时 PHP 的 OOP 支持
  • 如何让Spring JMS从注释@JmsListener中选择目标队列名称

    任何帮助将不胜感激 我正在尝试使用 spring JMSListener 创建 MDB 的替代品 我希望将目的地名称作为注释传递 但我注意到org springframework jms listener DefaultMessageLis
  • 对象的 JVM 深度内存大小[重复]

    这个问题在这里已经有答案了 据我所知 众所周知的 Instrumentation Java 方法无法正确计算对象的深度大小 是否有可靠的方法在 JVM 上计算对象的正确深度大小 我正在考虑的用例是固定 或上限 内存大小的数据结构 即缓存 注
  • 如何显示多个 YouTube 视频而不重叠音频

    我有一个包含一些 YouTube 视频嵌入代码的页面 当用户在一个视频上单击 gt 播放 时 页面上的所有其他视频都需要暂停 否则它们的音频会与刚刚播放的新视频重叠 实现这一点最有效的方法是什么 好吧 这是我根据其他人的一些代码提出的解决方
  • postgreSQL中的@@Fetch_status

    我正在将数据库从 MS SQL Server 传输到 PostgreSQL 但此触发器有问题 CREATE TRIGGER added clients ON client FOR INSERT AS BEGIN DECLARE cursor
  • 如何强类型组合 mixin?

    我正在尝试使用函数组合通过 mixin 向对象添加行为 const pipe funcs args any gt any gt initial any gt funcs reduce object fn gt fn object initi
  • 快速,将文件发送到服务器

    我正在学习 swift 我使用下面的代码向服务器发送请求 它适用于简单的请求 我从服务器得到响应 我的问题是我无法将文件发送到服务器 code let parameters parameter let request NSMutableUR
  • 播放后重定向 html5 视频

    我有一个 html 5 视频 我删除了控制按钮并添加了 js 代码 以便用户在单击视频时播放视频 我需要做的是绑定一个额外的脚本 该脚本将在视频播放后重定向页面 而无需重新加载页面 下面是我的js代码 function play var v
  • 如何获取colspan的值

    我尝试过不同的 jQuery 方法 var num this attr colspan text var num this attr colspan val var num this td colspan val var num this
  • 在c#中将字符串转换为十进制

    我在使用decimal parse 将字符串转换为十进制值时遇到一些问题 这是我的代码行 fixPrice decimal Parse mItemParts Groups price Value Replace Replace Replac
  • 开发人员是否需要为在 Windows Azure Marketplace 上发布 SaaS 应用程序付费?

    目前我正在构建一个简单的 SaaS 驱动的 TMS 目的是在 Windows Azure Marketplace 中发布它 我无法找到任何定价 微软是否向开发者收取发布费用 是按月计算的吗 或者 Windows Azure 上托管的所有应用
  • OpenGL-OpenCL 互操作传输时间 + 位图纹理

    两部分问题 我正在开展一个学校项目 使用生命游戏作为实验 gpgpu 的工具 我使用 OpenCL 和 OpenGL 进行实时可视化 目标是让这个东西尽可能大 更快 经过分析 我发现帧时间主要由 CL 获取和释放 GL 缓冲区决定 并且时间
  • JavaScript 初学者遇到的引号问题

    我正在尝试从一本书 Jeremy McPeak 的 Beginner JavaScript 中学习 JS 但我坚持使用以下代码 html 中的结果是这样的 56 02 degrees centigrade is 56 as an integ
  • 如何分发带有依赖库的 Mac OS X?

    我有一个程序 特别是我的条目SO DevDays 倒计时应用挑战 https meta stackexchange com questions 20420 countdown app for devdays 21659 21659 它依赖于
  • 基于多个文件的存在激活 Maven 配置文件

    我想根据多个文件的存在来激活配置文件 在下面的示例中 如果两个文件都被激活 我希望配置文件被激活my marker and another marker exists
  • 包恢复失败。回滚包更改

    当我尝试在 VS2017 中为 asp net core 安装任何 nuget 包时 它不断显示每个 包的 包恢复失败 回滚包更改 您可以执行以下步骤 VS Tools Options Nuget 包管理器 General 清除所有 Nug
  • Gradle编译:如何从依赖关系中识别组和模块?

    有时 我不想添加所有依赖项 因此我需要从依赖项中排除一些依赖项 例如 compile com google http client google http client 1 20 0 exclude group org apache htt
  • ODBC Teradata 驱动程序 HY001 内存分配错误。什么意思?

    我正在使用 python 脚本 该脚本使用 teradata python 模块和类似于下面的脚本将一批数据插入 Teradata 它使用 ODBC 连接 偶尔会出现以下错误 HY001 Teradata ODBC Teradata Dri
  • 将 MutationGroup 流式传输到 Spanner 中

    我正在尝试使用 SpannerIO 将 MutationGroups 流式传输到扳手中 目标是每 10 秒写入新的 MuationGroup 因为我们将使用 Spanner 来查询近期 KPI 当我不使用任何 Windows 时 出现以下错