Google PubSub 和来自 TOPIC 的重复消息

2023-11-26

如何防止 Google Cloud PubSub 中发生重复消息?

比如说,我有一个处理它所订阅的消息的代码。

假设我有 2 个节点,它们的服务具有相同的代码。

一旦一个节点收到消息但尚未确认,另一个节点将收到相同的消息。这就是问题所在,我们有两个重复的消息.

void messageReceiver(PubsubMessage pubsubMessage, AckReplyConsumer ackReply) {

        submitHandler.handle(toMessage(pubsubMessage))
                .doOnSuccess((response) -> {
                    log.info("Acknowledging the successfully processed message id: {}, response {}", pubsubMessage.getMessageId(), response);
                    ackReply.ack();  // <---- acknowledged
                })
                .doOnError((e) -> {
                    log.error("Not acknowledging due to an exception", e);
                    ackReply.nack();
                })
                .doOnTerminate(span::finish)
                .subscribe();
    }

对此有什么解决办法吗?这是正常行为吗?


Google Cloud Pub/Sub 使用“至少一次”交付。从the docs:

通常,Cloud Pub/Sub 会按消息发布的顺序传递每条消息一次。然而,消息有时可能会无序传送或多次传送。一般来说,要满足多次交付要求您的订户幂等的处理消息时。

这意味着它保证将以 1:N 的方式传递消息,因此,如果您不首先通过其他方式对其进行重复数据删除,则可能会多次获取该消息。您无法定义任何设置来保证恰好一次传送。该文档确实提到您可以使用 Cloud Dataflow 获得您想要的行为PubSubIO, but 该解决方案似乎已被弃用:

您可以使用 Cloud Dataflow 实现 Cloud Pub/Sub 消息流的一次性处理PubsubIO。 PubsubIO 可对自定义消息标识符或由 Cloud Pub/Sub 分配的消息进行重复消息删除。

说这一切,我从来没有actually看到 Google Cloud Pub/Sub 发送了两次消息。您确定这确实是您遇到的问题吗?还是因为您没有在确认截止期限(如上所述,默认为 10 秒)内确认该消息,所以该消息被重新发出。如果您不承认,它将重新发布。从the docs (强调我的):

为单个主题创建订阅。它有几个属性可以在创建时设置或稍后更新,包括:

  • An 确认期限: 如果您的代码在截止日期之前未确认该消息,则会再次发送该消息。默认值为 10 秒。您可以指定的最大自定义截止时间为 600 秒(10 分钟)。

如果是这种情况,只需在截止日期内确认您的消息,您就不会经常看到这些重复内容。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Google PubSub 和来自 TOPIC 的重复消息 的相关文章

随机推荐

  • 无法使用应用程序加载器上传 iOS 应用程序

    厌倦了与苹果公司撞墙 所以我在这里问这个问题 应用程序加载器卡在 正在将包上传到 iTunes Store 显示 37 7 MB 中的 616 字节 0 字节 秒 截屏 它会保持这种状态大约 20 分钟 然后显示一条不确定的错误消息 发生异
  • Minikube 将在本地主机上运行的 MySQL 作为服务公开

    我的机器上运行着 minikube v0 17 1 版本 我想模拟 AWS 中的环境 其中我的 MySQL 实例将位于 Kubernetes 集群之外 基本上 如何将我的机器上运行的本地 MySQL 实例暴露给通过 minikube 运行的
  • 如何设置Y轴的最大值和最小值

    I am using line chart from http www chartjs org 正如您所看到的 Y 轴的最大值 130 和最小值 60 是自动选择的 我希望最大值 500 最小值 0 这可能吗 对于 Chart js V2
  • Python:使用 mysqldb 将 MySQL 表作为字典导入?

    有人知道如何使用 mysqldb 将包含大量行的 MySQL 表转换为 Python 中的字典对象列表吗 我的意思是 将一组包含 a b 和 c 列的 MySQL 行转换为如下所示的 Python 对象 data a A b 2 4 c 3
  • 在 JBoss 中部署的 java servlet 中加载属性文件作为 war

    我在 JBoss 4 0 2 中部署了一个 servlet 作为 war 我有一个已部署应用程序的属性文件 我应该把这个文件放在哪里 jboss server default conf文件夹下的conf目录下 如何以可移植的方式加载该属性文
  • 如何在 Handlebars 中引用当前迭代的对象

    有没有办法获取Handlebars中当前迭代的对象 code 我已经提到过processObject 这是不正确的 这就是我需要更换 解决方案的地方 希望你明白我想说的 的内容objArr可能看起来像 var objArr objField
  • android 获取视频缩略图路径,而不是位图

    是否可以获取视频缩略图 PATH 而不是 Bitmap 对象本身 我知道方法 MediaStore Images Thumbnails queryMiniThumbnail 但由于我使用自己的位图缓存机制 我希望拥有视频缩略图的路径而不是位
  • 在 grails 中插入的 SQL 查询

    如何在 grails 中执行纯 sql 我需要使用 sql 查询在数据库中插入新记录 我们如何在不使用 HQL 和 gorm 关系的情况下实现这一目标 thanks groovy sql Sql 简化了执行 JDBC 查询的细节 在 Gra
  • UIPanGestureRecognizer 有时无法在 iOS 7 上运行

    我收到 iOS 7 用户的间歇性报告称UIPanGestureRecognizer每隔一段时间就会停止处理某些视图 他们应该能够向右 向左滑动视图 但它只是中断并且由于某种未知原因而无法工作 强制退出应用程序并重新启动即可解决问题 这个问题
  • 如何在play框架项目中使用相对路径访问资源文件?

    我的 play 框架项目使用资源文件 例如我在资源目录下创建的 CSV 文件 这些文件是我直接在 play 项目根目录下创建的 因此它与其他目录 如 app public 等 处于同一级别 从 Java 或 Scala 文件中如何打开此类文
  • find 和Where 与关系之间的区别

    我不认为在活动记录和查找数据方面有什么区别 这是我的模型 class User lt ActiveRecord Base has many shows end class Show lt ActiveRecord Base belongs
  • 如何从主方法调用非静态方法? [复制]

    这个问题在这里已经有答案了 例如 我正在尝试做这样的事情 public class Test public static void main String args int arr new int 5 arrPrint arr public
  • 为什么 array.reduce() 从索引 1 开始

    我想知道为什么索引在array reduce 在下面的示例中从 1 而不是 0 开始 11 22 33 44 reduce acc val index gt console log val This outputs 22 33 and 44
  • JavaScript 迭代器

    我在浏览 MDN Mozilla 开发者网络 时遇到了迭代器和生成器 很自然地 我尝试了 Google Chrome v21 页面中给出的代码片段 具体来说 这段代码 var it Iterator lang for var pair in
  • Firebase 使用安全规则删除子项!写入时存在 data.exists

    我有一个用于添加新数据的安全规则 CATEGORIES CATEGORIES write root child USERS auth uid type val admin data exists root child USERS auth
  • Android:SingleClientConnManager 的使用无效:连接仍分配[重复]

    这个问题在这里已经有答案了 可能的重复 使用 HttpRequest execute 时出现异常 SingleClientConnManager 的使用无效 连接仍分配 我在 Android 上工作 我创建了 Http Singleton
  • 使用 SSE 进行高效 4x4 矩阵向量乘法:水平加法和点积 - 有什么意义?

    我正在尝试使用 SSE 找到 4x4 矩阵 M 与向量 u 乘法的最有效实现 我的意思是 Mu v 据我了解 有两种主要方法可以解决此问题 method 1 v1 dot row1 u v2 dot row2 u v3 dot row3 u
  • 如何将java.util.logging发送到log4j?

    我有一个现有的应用程序 它根据 log4j 进行所有日志记录 我们使用许多其他库 它们要么也使用 log4j 要么根据 Commons Logging 进行日志记录 最终在我们的环境中在幕后使用 log4j 我们的依赖项之一甚至针对 slf
  • SQL where in 子句使用 pandas 数据框中的列

    我有一个 pandas 数据框 其中有一列 ID 我需要运行另一个 sql 查询 其 WHERE 子句由上述列中的所有 ID 指定 Ex df1 DataFrame IDs 1 2 3 4 5 6 query Select id SUM r
  • Google PubSub 和来自 TOPIC 的重复消息

    如何防止 Google Cloud PubSub 中发生重复消息 比如说 我有一个处理它所订阅的消息的代码 假设我有 2 个节点 它们的服务具有相同的代码 一旦一个节点收到消息但尚未确认 另一个节点将收到相同的消息 这就是问题所在 我们有两