带有数据流的 Apache Beam Go SDK

2023-12-02

我一直在使用 Go Beam SDK (v2.13.0),但无法获取字数统计示例致力于 GCP 数据流。它进入崩溃循环尝试启动org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness。使用 Direct 运行程序在本地运行时,该示例可以正确执行。

该示例与上面给出的原始示例完全没有修改。

堆栈跟踪是:

org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException: Protocol message had invalid UTF-8. 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException.invalidUtf8(InvalidProtocolBufferException.java:148) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.CodedInputStream$StreamDecoder.readStringRequireUtf8(CodedInputStream.java:2353) 
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec.<init>(RunnerApi.java:59611) 
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec.<init>(RunnerApi.java:59572) 
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec$1.parsePartialFrom(RunnerApi.java:60241) 
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec$1.parsePartialFrom(RunnerApi.java:60235) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.CodedInputStream$StreamDecoder.readMessage(CodedInputStream.java:2424) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder.<init>(RunnerApi.java:27531) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder.<init>(RunnerApi.java:27489) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder$1.parsePartialFrom(RunnerApi.java:28410) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder$1.parsePartialFrom(RunnerApi.java:28404) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder$Builder.mergeFrom(RunnerApi.java:28028) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder$Builder.mergeFrom(RunnerApi.java:27868) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.CodedInputStream$StreamDecoder.readMessage(CodedInputStream.java:2408) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntryLite.parseField(MapEntryLite.java:128) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntryLite.parseEntry(MapEntryLite.java:184) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntry.<init>(MapEntry.java:106) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntry.<init>(MapEntry.java:50) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntry$Metadata$1.parsePartialFrom(MapEntry.java:70) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntry$Metadata$1.parsePartialFrom(MapEntry.java:64) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.CodedInputStream$StreamDecoder.readMessage(CodedInputStream.java:2424) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Components.<init>(RunnerApi.java:930) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Components.<init>(RunnerApi.java:848) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Components$1.parsePartialFrom(RunnerApi.java:2714) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Components$1.parsePartialFrom(RunnerApi.java:2708) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.CodedInputStream$StreamDecoder.readMessage(CodedInputStream.java:2424) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Pipeline.<init>(RunnerApi.java:2892) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Pipeline.<init>(RunnerApi.java:2850) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Pipeline$1.parsePartialFrom(RunnerApi.java:3981) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Pipeline$1.parsePartialFrom(RunnerApi.java:3975) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:221) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:239) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:244) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.GeneratedMessageV3.parseWithIOException(GeneratedMessageV3.java:311) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Pipeline.parseFrom(RunnerApi.java:3222) 
at org.apache.beam.runners.dataflow.worker.DataflowWorkerHarnessHelper.getPipelineFromEnv(DataflowWorkerHarnessHelper.java:131) 
at org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:59) 

我正在使用中指定的 docker 映像example并且还从我自己的 docker 使用相同的标签(v2.13.0)进行了尝试,但仍然遇到相同的错误。我意识到它还没有准备好投入生产,但我希望样品能够工作。

按照开始的说明,我像这样运行了这项工作:

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://example-bucket/counts \
--runner dataflow \
--project example-project \
--temp_location gs://example-bucket/tmp/ \
--staging_location gs://example-bucket/binaries/ \
--worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515

我再次尝试了入门中提供的 docker,以及使用 v2.13.0 构建的 docker。

我的示例文件 go.mod 是:

module example.org/wordcount

go 1.12

require (
    cloud.google.com/go v0.41.0 // indirect
    github.com/apache/beam v2.13.0+incompatible
    github.com/pkg/errors v0.8.1 // indirect
    golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect
    google.golang.org/grpc v1.22.0 // indirect
)

可能是什么原因造成的?


Dataflow 不正式支持 Apache Beam Go SDK。不过,一些用户已经能够使用它。我怀疑这个版本可能有问题。您也许可以尝试不同的版本。

您可以与其他用户讨论光束邮件列表关于哪些版本适用于他们(尽管不受支持)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

带有数据流的 Apache Beam Go SDK 的相关文章

随机推荐

  • 在 MATLAB 中声明全局变量

    有没有办法在 MATLAB 中声明全局变量 请不要回复 global x y z 因为我也能读书帮助文件 我已经声明了一个全局变量 x 然后做了这样的事情 function x test global x test1 end 函数在哪里te
  • file_get_contents - 无法打开流:HTTP 请求失败! HTTP/1.1 404 未找到

    将我的网站移动到新域后 我在 file get contents 方面遇到了一些奇怪的问题 我必须设置一个新的域和 IP 地址 使用 Plesk 才能使新的 ssl 证书正常工作 现在我的 file get contents 在同一域上调用
  • 数组本身的名称,它存储在哪里

    数组的名称如何存储在内存中 例如 如果我写 char arr 10 数组项从虚拟地址开始存储在内存中 arr 0 这实际上是 arr 的值 但是在哪里arr本身存储 静态多维数组的行也是如此 char arr 10 20 arr 本身以及
  • 如何在 C# 中将此 05:41:33 Apr 23, 2012 PDT 值转换为日期时间? [复制]

    这个问题在这里已经有答案了 可能的重复 使用 PST CEST UTC etc 格式的时区解析 DateTime 如何将 PDT 时间字符串转换为日期时间 我想将此值 05 41 33 Apr 23 2012 PDT 转换为 datetim
  • 如何在没有 libcurl 的情况下用 C 发出 HTTP get 请求?

    我想编写一个 C 程序来生成 Get 请求 而不使用任何外部库 仅使用 C 库和套接字是否可以实现这一点 我正在考虑制作一个 http 数据包 使用正确的格式 并将其发送到服务器 这是唯一可能的方法还是有更好的方法 使用 BSD 套接字 或
  • 主要 Xcode 7 Sprite Kit Atlas 错误

    所以今天我决定开始在 El Capitan 和 iOS 9 上测试我的游戏 这是一个大项目 我已经用业余时间从事了近 2 年的工作 所以我将代码移植到 Swift 2 0 单击运行按钮并祈祷 Apple 没有破坏 Sprite Kit 就像
  • 如何使用 Volley 在 Android 中发送“multipart/form-data”POST

    有谁能够完成发送multipart form data在 Android 中使用 Volley 进行 POST 了吗 我尝试上传没有成功image png使用 POST 请求到我们的服务器 我很好奇是否有人这样做 我相信执行此操作的默认方法
  • Web 浏览器控件的进度条

    如何使用 C 语言在 Windows 应用程序项目中放置和使用 Web 浏览器控件的进度条 看着那 这WebBrowser ProgressChanged event
  • 了解 Java 迭代器

    如果我运行以下代码 它将打印出 3 次重复内容 但是当我删除 while 循环内的 if 语句时 只是为了看看它会迭代多少次 它会启动一个无限循环 这实际上如何hasNext 方法有效吗 我认为这只会迭代 5 次 因为列表中有 5 个项目
  • 使用鼠标和触摸通过 Adorner 进行 WPF 拖放

    我希望这是一个好问题 所以我将详细写下我想要实现的目标 我在互联网上找到的内容 并展示我到目前为止所做的事情以及我尝试过的事情 我需要向我的应用程序添加拖放功能 我有图像 基本上是控件 我想将其拖动到列表框的项目 这是示例用户界面 这是我现
  • 我们如何使用 train_on_batch 执行提前停止?

    我在循环中手动运行纪元 并在循环中进一步嵌套小批量 在每个小批量中 我需要调用train on batch 启用定制模型的训练 是否有手动方法来恢复提前停止的功能 即打破循环 在实践中 提前停止 主要是通过以下方式完成的 1 训练 X ep
  • 扩展 BaseRequestOptions 时注入的依赖项未定义

    我正在延长BaseRequestOptions在 Angular2 中为每个请求添加标头 我也有一个Config提供基于域的键 值对的类 我需要将其注入到我的派生类中 import BaseRequestOptions from angul
  • C# - 异步返回值

    private TaskCompletionSource
  • Xdebug 异常类的方法

    是否可以看到 Xdebug 创建的扩展 Exception 类的方法 我想获取 HTML 格式的堆栈跟踪 因此 在破解之后 没有像 Niels 展示的那样的方法 但有一个名为 exception gt xdebug message 的公共属
  • 添加谷歌服务 - 任务“:app:processDebugResources”执行失败

    我正在尝试按照此网站上的步骤在 Android Studio 中实现 GCM 客户端 在 Android 上实现 GCM 客户端 正如 设置 Google Play 服务 中提到的 我编辑了应用程序的 build gradle 文件 使其看
  • ThreeJS 中的弯曲文本对象

    有this回购协议this例如 它已经有近 2 年历史了 因此不适用于 ThreeJS 的最新版本 我遇到以下错误和警告 error THREE Matrix3 getInverse no longer takes a Matrix4 ar
  • Python - 打印列表中既没有逗号也没有撇号的项目

    我的代码的最小工作示例 Create output data file out data file open output file w out data file write Header n out data file close li
  • 恰好具有 k 个颜色边的生成树

    我有一个连通的无向图 其边为黑色或白色 并且有一个整数 k 我正在尝试编写一个算法来判断是否存在具有正好 k 个黑边的生成树 不一定必须找到实际的树 我使用克鲁斯卡尔算法来查找生成树中黑边的最小和最大可能数量 如果 k 超出此范围 则不存在
  • 手动触发IOptionsMonitor<>.OnChange

    在 ASP NET Core 2 1 中 我使用 IOptionsMonitor 并对其进行设置 以便我可以在更改 appSettings json 文件时成功获取事件 所以这是有效的 我现在想做的是通过代码手动更改选项中的一些值 并触发我
  • 带有数据流的 Apache Beam Go SDK

    我一直在使用 Go Beam SDK v2 13 0 但无法获取字数统计示例致力于 GCP 数据流 它进入崩溃循环尝试启动org apache beam runners dataflow worker DataflowRunnerHarne