Dataflow 作业完成时通知 Google PubSub

2024-03-20

有没有办法在 Google Dataflow 作业完成后将消息发布到 Google Pubsub 上?我们需要通知依赖系统传入数据的处理已完成。将数据写入到接收器后,Dataflow 如何发布?

EDIT:我们希望在管道完成写入 GCS 后发出通知。我们的管道如下所示:


 
Pipeline.create(options)
                .apply(....)
                .apply(AvroIO.Write.named("Write to GCS")
                             .withSchema(Extract.class)
                             .to(options.getOutputPath())
                             .withSuffix(".avro"));
p.run();
  

如果我们在 pipeline.apply(...) 方法之外添加逻辑,我们会在代码完成执行时收到通知,而不是在管道完成时收到通知。理想情况下我们可以添加另一个.apply(...)在 AvroIO 接收器之后并向 PubSub 发布消息。


您有两种选择可以在管道完成时收到通知,然后发布消息 - 或者在管道完成运行后执行您想要的任何操作:

  1. Use the BlockingPipelineRunner。这将运行您的管道同步地 https://cloud.google.com/dataflow/pipelines/specifying-exec-params#blocking-execution.
  2. Use the DataflowPipelineRunner。这将运行您的管道异步地 https://cloud.google.com/dataflow/pipelines/specifying-exec-params#asynchronous-execution。然后,您可以轮询管道的状态,并等待其完成。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Dataflow 作业完成时通知 Google PubSub 的相关文章

  • 计算 Pubsub 主题中未确认消息的数量

    我想在来自 pubsub 主题的所有消息都得到确认后执行一项操作 我尝试使用 Stackdriver 监控 API 来衡量 按云区域细分的未确认消息数 但不了解区域过滤器以及为什么需要它 在哪里可以查看我的主题使用的区域 并且由于某种未知的
  • Dataflow 作业完成时通知 Google PubSub

    有没有办法在 Google Dataflow 作业完成后将消息发布到 Google Pubsub 上 我们需要通知依赖系统传入数据的处理已完成 将数据写入到接收器后 Dataflow 如何发布 EDIT 我们希望在管道完成写入 GCS 后发
  • 安排 Google Cloud Dataflow 作业的最简单方法

    我只需要每天运行一个数据流管道 但在我看来 像 App Engine Cron Service 这样需要构建整个 Web 应用程序的建议解决方案似乎有点太多了 我正在考虑仅从 Compute Engine Linux 虚拟机中的 cron
  • GCP Pub/Sub,如果已经有活动订阅,您可以在新订阅上重播旧消息吗

    在 GCP Pub Sub 中 我有一个主题和一个Subscription1并已开始发布消息 我可以添加另一个订阅吗Subscription2并重播之前发布的旧消息Subscription2被创造了 它会允许吗 卡夫卡允许 在什么时候我会失
  • Cloud SQL 增量到 BigQuery

    我需要针对我正在研究的用例之一提供一些建议 使用案例 我们在 Cloud SQL 中拥有大约 5 10 个表的数据 其中一些被视为查找表 另一些则被视为事务性表 我们需要将其发送到 BigQuery 以生成 3 4 个表 扁平化 嵌套或非规
  • 如何使用 python API 列出所有数据流作业

    我的用例涉及获取项目中存在的所有流数据流作业的作业 ID 并取消它 更新我的数据流作业的源并重新运行它 我正在尝试使用 python 来实现这一点 直到现在我还没有遇到任何有用的文档 我想到使用 python 的库子进程来执行 gcloud
  • 在数据流中运行 Apache Beam Pipeline 时出现 SSLHandshakeException

    我有一个 Apache Beam Pipeline 在 DoFn 步骤之一中 它执行 https 调用 想想 REST API 在我的本地环境中 所有这些都可以在 DirectRun 中正常运行 这是我的本地环境 apache beam 2
  • Apache Beam:DoFn 与 PTransform

    Both DoFn and PTransform是一种定义操作的方法PCollection 我们如何知道何时使用哪个 理解它的一个简单方法是类比map f 对于列表 高阶函数map将函数应用于列表的每个元素 返回结果的新列表 您可以将其称为
  • 错误 403:向 Cloud PubSub 发送测试消息时出错:用户无权执行此操作

    我想设置推送通知手表 但收到错误响应 我需要什么授权 Request Google API client getClient POST request ch curl init https www googleapis com gmail
  • 压缩保存在Google云存储中的文件

    是否可以压缩已保存在 Google 云存储中的文件 这些文件由 Google 数据流代码创建和填充 数据流无法写入压缩文件 但我的要求是将其保存为压缩格式 标准 TextIO Sink 不支持写入压缩文件 因为从压缩文件中读取的可扩展性较差
  • Google Pub/Sub 是否有队列或主题?

    我熟悉 JMS 对 Google Pub Sub 还很陌生 在 JMS 中有 2 个选项 Queue 只有一个消费者可以接受消息 Topic 每个消费者接受来自主题的每条消息 我相信 Google Pub Sub 应该支持这样的东西 但是快
  • 计算一次 GroupBy,然后将其传递给 Google DataFlow (Python SDK) 中的多个转换

    我正在使用适用于 Apache Beam 的 Python SDK 在 Google DataFlow 上运行特征提取管道 我需要运行多个转换 所有这些转换都希望项目按键分组 基于这个答案question https stackoverfl
  • 使用 Apache Beam python 创建 Google 云数据流模板时出现 RuntimeValueProviderError

    我无法使用 python 3 7 暂存云数据流模板 它在一个参数化参数上失败了apache beam error RuntimeValueProviderError RuntimeValueProvider option input typ
  • Spring 与 Apache Beam

    我想将 Spring 与 Apache Beam 结合使用 它将在 Google Cloud Data flow Runner 上运行 数据流作业应该能够在执行管道步骤时使用 Spring 运行时应用程序上下文 我想在 Apache Bea
  • detectorClassPathResourcesToStage - 无法转换 url

    当我在 GCE 中运行 jar 时 出现以下错误 java jar mySimple jar project myProjcet Aug 13 2015 1 22 26 AM com google cloud dataflow sdk ru
  • 使用 GlobalWindow 在 Beam 中进行状态垃圾收集

    Apache Beam 最近推出了状态细胞 https beam apache org blog 2017 02 13 stateful processing html 通过StateSpec和 StateId注释 在 Apache Fli
  • 使用Google Cloud Pub/Sub时如何在AWS/SQS中实现“锁定”功能?

    当您想要在 Google Cloud Pub Sub 之上实现生产者 消费者模式时 您会期望每条消息只能由一个消费者处理 但 Google Cloud Pub Sub 会将每条消息发送给所有订阅者 但AWS SQS具有以下功能可以轻松保证这
  • Dataflow sideInput 可以通过读取 gcs 存储桶来更新每个窗口吗?

    我目前正在创建一个 PCollectionView 方法是从 gcs 存储桶中读取过滤信息 并将其作为侧面输入传递到管道的不同阶段 以过滤输出 如果 gcs 存储桶中的文件发生更改 我希望当前正在运行的管道使用这个新的过滤器信息 如果我的过
  • 使用 Apache Beam 的 Dataflow 批量加载的性能问题

    我正在对数据流批量加载进行性能基准测试 发现与 Bigquery 命令行工具上的相同负载相比 加载速度太慢 文件大小约为 20 MB 包含数百万条记录 我尝试了不同的机器类型并获得了最佳的负载性能n1 highmem 4加载目标 BQ 表的
  • 与谷歌数据流的复杂连接

    我是一个新手 试图了解我们如何将批量 ETL 流程重写到 Google Dataflow 中 我已经阅读了一些文档 运行了一些示例 我建议新的 ETL 流程将由业务事件 即源 PCollection 驱动 这些将触发该特定业务实体的 ETL

随机推荐