如何在 DataFlow 作业完成时发出通知

2023-12-10

我想在 GAE 上知道数据流作业何时完成。

我尝试制作以下两条管道

1.

 | 'write to bigquery' >> beam.io.WriteToBigQuery(...)
 | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')

2.

 | 'write to bigquery' >> beam.io.WriteToBigQuery(...)
 | 'DoPubSub' >> beam.ParDo(DoPubSub())   # do Publish using google.cloud.pubsub

但是上面的代码都会产生以下错误:

AttributeError:“PDone”对象没有属性“windowing”

WriteToBigquery 之后如何执行程序?

笔记: 我通过 REST 使用模板执行数据流。 所以,不能使用pipeline_result.wait_until_finish().

Edit.

完整的堆栈就在这里。

File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 327, in <module>
   vital_data_export()
 File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 323, in vital_data_export
   result = p.run()
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 382, in run
   return self.runner.run_pipeline(self)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 285, in run_pipeline
   return_context=True)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 580, in to_runner_api
   root_transform_id = context.transforms.get_id(self._root_transform())
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 810, in to_runner_api
   for part in self.parts],
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in to_runner_api
   for tag, out in self.named_outputs().items()},
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in <dictcomp>
   for tag, out in self.named_outputs().items()},
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 144, in to_runner_api
   self.windowing))
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 128, in windowing
   self.producer.inputs)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\transforms\ptransform.py", line 443, in get_windowing
   return inputs[0].windowing
AttributeError: 'PDone' object has no attribute 'windowing'

在java中,这就是我在数据流管道末尾向PubSub发布“完成”事件的方法,其中管道的输出正在写入BigQuery。希望Python中有等价的东西..

PCollection<TableRow> rows = data.apply("ConvertToTableRow", ParDo.of(new ConvertToRow()));
// Normally this would be the end of the pipeline..
WriteResult writeResult = rows.apply("WriteToBQ", BigQueryIO.writeTableRows().to(...);
// Transformations after this will be done AFTER all rows have been written to BQ
rows.apply(Wait.on(writeResult.getFailedInserts()))
    // Transforms each row inserted to an Integer of value 1
    .apply("OnePerInsertedRow", ParDo.of(new DoFn<TableRow, Integer>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(Integer.valueOf(1));
        }
    }))
    // https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java#L51
    // Combines a PCollection of Integers (all 1's) by summing them. 
    // Outputs a PCollection of one integer element with the sum
    .apply("SumInsertedCounts", Sum.integersGlobally())
    .apply("CountsMessage", ParDo.of(new DoFn<Integer, PubsubMessage>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String messagePayload = "pipeline_completed";
            Map<String, String> attributes = new HashMap<>();
            attributes.put("rows_written", c.element().toString());
            PubsubMessage message = new PubsubMessage(messagePayload.getBytes(), attributes);
            c.output(message);
        }
    }))
    .apply("PublishCompletionMessage", PubsubIO.writeMessages().to(/* output topic */));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 DataFlow 作业完成时发出通知 的相关文章

  • 不断收到错误消息,表示 localhost 的 Server App Engine Standard 无法启动

    当尝试在 Eclipse 中启动 Google 应用程序引擎项目时 我不断收到错误消息 指出本地主机上的服务器应用程序引擎标准无法启动 错误日志如下 以前有人遇到过这个问题吗 问题的原因是什么 WARNING An illegal refl
  • 如何在Google AppEngine上设置环境变量?

    我正在尝试在谷歌应用程序引擎上设置和使用环境变量 我的 app yaml 文件如下所示 但是 当我使用 os Getenv mytoken 时 我得到一个空字符串 而不是我设置的实际值 是GAE的bug吗 api version go1 h
  • Django modelForm 中的文件上传

    我正在尝试在 appengine django 中上传文档 使用纯 django 代码成功上传文档 使用 python manage py runsever 但是当我尝试使用 appengine 项目运行 django 时 它给了我错误 E
  • Dataflow sideInput 可以通过读取 gcs 存储桶来更新每个窗口吗?

    我目前正在创建一个 PCollectionView 方法是从 gcs 存储桶中读取过滤信息 并将其作为侧面输入传递到管道的不同阶段 以过滤输出 如果 gcs 存储桶中的文件发生更改 我希望当前正在运行的管道使用这个新的过滤器信息 如果我的过
  • 如何在 Google App Engine 中为模型定义唯一属性?

    我需要一些独特的属性 我怎样才能实现这个目标 有没有类似的东西unique True 我正在使用适用于 Python 的 Google App Engine Google 提供了执行此操作的函数 http code google com a
  • 使用 Django 在 App Engine 上存储图像

    我正在尝试使用 Django 在 Google App Engine 上的 db BlobProperty 字段中上传并保存调整大小的图像 我认为处理请求的相关部分如下所示 image images resize request POST
  • Google App Engine 和 Git 最佳实践

    我正在 Google App Engine 上开发一个小型宠物项目 我想使用以下命令将源代码置于源代码控制之下github http www github com 这将允许我的朋友检查和修改源代码 我只有一个PetProject包含所有源的
  • Google App Engine 上按 IP 地址划分流量

    我想根据一组已知 IP 地址将流量引导至不同版本的 Google App Engine 代码 例如 如果传入请求来自给定列表上的 IP 地址 则流量将定向至版本 1 如果不是 则定向至版本 2 有没有办法从管理控制台或部署配置中执行此操作
  • 本地主机上的 Google App Engine GQL 查询

    我正在 Google App Engine Windows 上的 SDK 版本 1 7 0 上开发一个应用程序 我需要经常测试该应用程序 并且此测试涉及数据存储上的大量 GQL 查询 您可以在 App Engine 管理界面的浏览器中在线运
  • 按 ListProperty (NDB) 对查询进行排序

    如何按 ListProperty 对查询进行排序 该模型 class Chapter ndb Model title ndb StringProperty required True version ndb IntegerProperty
  • 为什么 Google Cloud Platform App Engine 上的“上次修改时间”不正确?

    我在 Google Cloud Platform App Engine 上多次部署了我的网络 使用设置如下 app yaml runtime nodejs10 当我在本地主机上测试它时 一切都工作正常 但是当我将它部署到谷歌云平台时 响应头
  • 使用 Google App Engine 发送时的 Gmail 发送配额

    Gmail 规定每天 500 个收件人发送电子邮件的配额 如果您通过他们的 POP IMAP 界面发送 则配额仅为每天 100 个收件人 使用 Google 应用引擎在 Gmail 帐户上发送电子邮件的发送配额是多少 它会遵循 POP IM
  • 无法解析 ReferenceProperty -- App Engine

    我遇到了一个错误 无法找出其根本原因 错误如下 ReferenceProperty 无法解析 u StatusLog STATUSLOGSID 此错误仅有时发生 大约一天一次或两次 生成此错误的脚本成功的次数多于失败的次数 该错误最奇怪的事
  • GAE - Eclipse 中的开发服务器未更新?

    我在 Eclipse 上使用 Google AppEngine 开发服务器 我的本地网页似乎没有更新 直到我在开发服务器上进行了多次重新启动 使用 Eclipse 中的 运行 或 调试 按钮 我究竟做错了什么 基本流程是 更改 java 文
  • DatastoreFailureException:内部错误

    刚刚开始收到此错误 没有进行任何代码更改 数据存储上的 GAE J Datanucleus JDO JPA 版本 v1 我在欧盟服务器上的应用程序无法运行 美国目前不受影响 怀疑谷歌内部有什么问题 有人能同意吗 是的 大约 40 分钟前我开
  • 插入对象时,Google Cloud Storage 在 ACL 定义中期望使用哪个域?

    我正在使用谷歌存储 API 来保存和检索部署在 GAE 上的应用程序中的图像 我成功检索了图像 但是当涉及到持久化时 我收到错误 400 详细信息如下 400 OK code 400 errors domain global message
  • 发布到 Chrome 网上应用店时出错

    我在将 Chrome 网上应用店中的演示项目 Google Apps Marketplace 应用程序 发布到我自己的域时遇到错误 我收到以下错误 其 ID 在清单的 api console project id 字段中指定的 API 控制
  • Google Cloud Storage 上的批量重命名对象

    是否可以使用 gsutil 或其他工具 批量重命名 Google Cloud Storage 上的对象 我正在尝试找出一种方法将一堆图像从 JPG 重命名为 jpg 这是在 bash 中执行此操作的本机方法 下面逐行代码进行解释 gsuti
  • 使用 Google App Engine 和 Django 将第三方库 (twilio) 添加到项目中

    每个人 我是这个领域的新手 我使用 django 框架使用 google 应用引擎开发 Web 应用程序 我有一个关于 python lib dir 问题的故障排除 导入错误 没有名为 的模块 我的 appengine config py
  • 谷歌应用程序引擎 urlfetch gzip 到字符串

    使用 Google App Engine 我尝试从包含一个 csv 文件的 URL 中获取 gzip 文件 最终我想在我的网页上输出csv文件的内容 我现在有以下代码 usr bin env python import webapp2 fr

随机推荐