可以使用数据流将 pubsub 消息重复数据删除回 pubsub 吗?

2024-04-01

我有一个将数据写入 Google Cloud pubsub 的应用程序,根据 pubsub 的文档,由于重试机制而导致的重复偶尔可能会发生。还有消息乱序的问题,这在 pubsub 中也得不到保证。

另外,根据文档,可以使用 Google Cloud Dataflow 来删除这些消息的重复数据。

我想让这些消息在消息队列(意味着云 pubsub)中可用,以供服务使用,云数据流似乎确实有一个 pubsubio 编写器,但是您不会回到写入 pubsub 可能会创建的完全相同的问题吗?重复?订单不也是同样的问题吗?如何使用 pubsub (或任何其他系统)按顺序传输消息?

是否可以使用云数据流从一个 pubsub 主题读取并写入另一个 pubsub 并保证不重复?如果不是的话,你会怎么做才能支持相对少量数据的流式传输?

另外,我对 Apache Beam/Cloud Dataflow 非常陌生。如此简单的用例会是什么样子?我想我可以使用 pubsub 本身生成的 ID 进行重复数据删除,因为我让 pubsub 库进行内部重试而不是自己进行重试,因此重试时 ID 应该相同。


Cloud Dataflow / Apache Beam 是 Mac 卡车。它们专为大型数据源/流的并行化而设计。您可以向 PubSub 发送大量数据,但检测重复项并不是 Beam 的工作,因为此任务需要序列化。

读取 PubSub 然后写入不同的主题并不能消除重复的问题,因为您正在写入的新主题上可能会发生重复。此外,队列写入的并行化进一步增加了无序消息的问题。

重复的问题需要在从订阅读取的客户端上解决。一个简单的数据库查询可以让您知道某个项目已经被处理。然后你就丢弃该消息。

处理无序消息也必须设计到您的应用程序中。

PubSub 被设计为一个轻量级廉价的消息队列系统。如果您需要保证消息排序、无重复、先进先出等,您将需要使用不同的解决方案,这当然要昂贵得多。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

可以使用数据流将 pubsub 消息重复数据删除回 pubsub 吗? 的相关文章

随机推荐

  • 找到最小正值

    从固定数量 在本例中为 3 个值中找到最小非零正值或在没有正问题时返回 0 的最佳算法是什么 我的天真的方法如下 在Delphi中 但请随意使用您喜欢的任何方法 但我认为有一种更优雅的方法 value1Temp MaxInt value2T
  • 将多个源 ArrayList 同步到单个目标列表

    需要从两个不同的表加载项目列表 ArrayList 由于 Hibernate 映射复杂 它们无法作为一个 ArrayList 加载 所以它们是由类定义中的两个不同的Hibernate包分别加载的 当我使用它们时 我想合并成一个列表 所以我正
  • 仅在调试模式下使用特定的 minSdkVersion

    如何仅在调试模式下使用特定的 minSdkVersion 我想使用 minSdkVersion 21 进行调试模式 但使用 minSdkVersion 15 进行发布 我不想为此使用香料 因为会带来麻烦 我认为可能是这样的 但不起作用 de
  • 你能在 Perl 中强制刷新输出吗?

    我在 Perl 中有以下两行 print Warning this will overwrite existing files Continue y N n my input
  • 将 SAFEARRAY 从 C++ 返回到 C#

    我有一个 C 方法 可以创建 填充并返回 SAFEARRAY SAFEARRAY TestClass GetResult long size return GetSafeArrayList size How should I export
  • Lua中运算符~=是什么意思?

    什么是 Lua中的运算符是什么意思 例如 在以下代码中 if x params then the is not equals 这在其他语言中是等价的
  • 尝试在Flash AS3.0中使用BindingUtils

    我无法使此代码在包含 Flex SDK 4 0 的 AS3 0 Flash 中工作 import mx binding utils Bindable var myValue int 0 var cw ChangeWatcher Bindin
  • 如何手动向已在 mechanize 中设置了 cookie 的会话添加更多 cookie?

    我有一个 python 脚本 它抓取页面并接收 cookie 我想将另一个 cookie 附加到发送到服务器的现有 cookie 中 这样 在下一个请求中 我将获得原始页面中的 cookie 以及我手动设置的 cookie 无论如何要这样做
  • 更改 axios 的默认基本 url

    我已经像这样配置了我的 axios const axiosConfig baseURL http 127 0 0 1 8000 api timeout 30000 Vue prototype axios axios create axios
  • Android 滚动时如何在 RecyclerView 中加载更多数据

    我想为一个网站开发android应用程序 我阅读了以下网站的帖子json并显示其在RecyclerView每 10 个帖子以及当用户滚动时RecyclerView显示更多 10 条帖子并结束 我是业余爱好者 我写了下面的代码 但我不知道滚动
  • 如何在 blob 中计算情感分析

    我使用以下方法计算 200 个短句子的情绪 我没有使用训练数据集 for sentence in textblob sentences print sentence sentiment 分析返回两个值 极性和主观性 根据我在网上读到的内容
  • Google 文本转语音 API 音调调整

    如何在此代码中将音调调整为 1 20 from google cloud import texttospeech def text to wav voice name text language code join voice name s
  • 单击带有最新更新的 EditText 时应用程序崩溃(gradle 4.4 - android studio 3.1)

    我最近将我的 android studio 更新到版本 3 1 并将我的 gradle 更新到版本 4 4 从那时起 我一直面临着应用程序在点击某个按钮时进入 ANR 的问题EditText这应该会弹出一个软输入键盘 单击EditText我
  • Ajax 调用不会更新我的行

    我想用字符的 id 更新我的数据库 但是当我将它们放入插槽中时 它不会更新我想要更新的行 我的问题是 你能为我指明如何正确编码或修复我的错误的正确方向吗 function updateTeam var team slot if input
  • 在.Net 中使用 Scala 有什么好处?

    Scala 是一种特殊的编程语言 因为它同时针对 JVM 和 CLR 但有什么好处呢 是否值得将其视为 F 语言的可行替代方案 NET 上的 Scala 是 Miguel Garcia 领导的一项持续努力 最新状态是我们几乎能够在 NET
  • While 循环重置 Bash 脚本中的数字变量[重复]

    这个问题在这里已经有答案了 我正在尝试编写一个简单的 bash 脚本来在一组文件夹中的每个文件中执行某些操作 我还喜欢计算脚本读取了多少个文件 但是当脚本通过循环时 数值变量会被重置 我正在使用的代码是这样的 bin bash let AU
  • 有没有办法获取给定的classes.dex 文件中的类名?

    我正在构建一个家庭自动化应用程序 我正在尝试添加一个插件系统 作为测试 我将测试类 Button 的子类 导出为 APK 文件 并将其放入我的应用程序的文件目录中 我能够创建此类的新实例并将其放入我的视图中使用DexClassLoader
  • 绘制直方图的峰值

    我试图弄清楚如何使用绘制简单直方图的峰值scipy signal find peaks但发现的峰值似乎还很遥远 ages np array 10 5 22 13 50 45 67 30 21 34 60 67 89 45 45 65 his
  • HTTP 是否使用 UDP?

    这可能是一个愚蠢的问题 HTTP 是否使用过用户数据报协议 例如 如果使用 HTTP 传输 MP3 或视频 它内部是否使用 UDP 进行传输 From RFC 2616 http www ietf org rfc rfc2616 txt 通
  • 可以使用数据流将 pubsub 消息重复数据删除回 pubsub 吗?

    我有一个将数据写入 Google Cloud pubsub 的应用程序 根据 pubsub 的文档 由于重试机制而导致的重复偶尔可能会发生 还有消息乱序的问题 这在 pubsub 中也得不到保证 另外 根据文档 可以使用 Google Cl