在 pika / RabbitMQ 中处理长时间运行的任务

2023-12-20

我们正在尝试建立一个基本的定向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取一个任务,处理它并确认消息。

问题是,处理过程可能需要 10-20 分钟,而且我们当时没有回复消息,导致服务器与我们断开连接。

这是我们消费者的一些伪代码:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

第一个任务完成后,BlockingConnection 内部深处的某个地方会抛出异常,抱怨套接字已重置。另外,RabbitMQ日志显示消费者因未及时响应而被断开连接(为什么它重置连接而不是发送FIN很奇怪,但我们不会担心这一点)。

我们进行了很多搜索,因为我们相信这是 RabbitMQ 的正常用例(有很多长时间运行的任务应该在许多消费者之间分配),但似乎没有其他人真正遇到过这个问题。最后我们偶然发现了一个线程,建议使用心跳并生成long_running_task()在一个单独的线程中。

于是代码就变成了:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

这似乎可行,但非常混乱。我们是否确定ch对象是线程安全的吗?另外,想象一下long_running_task()正在使用该连接参数将任务添加到新队列(即这个长过程的第一部分已完成,让我们将任务发送到第二部分)。所以,线程正在使用connection目的。该线程安全吗?

更重要的是,这样做的首选方式是什么?我觉得这非常混乱,而且可能不是线程安全的,所以也许我们做得不对。谢谢!


目前,您最好的选择是关闭心跳,如果您阻塞时间过长,这将阻止 RabbitMQ 关闭连接。我正在尝试 pika 的核心连接管理和 IO 循环在后台线程中运行,但它不够稳定,无法发布。

In 鼠兔 v1.1.0 https://pika.readthedocs.io/en/stable/modules/parameters.html这是ConnectionParameters(heartbeat=0)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 pika / RabbitMQ 中处理长时间运行的任务 的相关文章

随机推荐

  • 拥有字符串映射如何将其与给定字符串进行比较

    我们有像 name location 这样的字符串对的映射 unix 就像绝对位置 a lamyfolder 我们得到了一些位置a lamyfolder mysubfolder myfile 如何找到哪个地图位置最适合给定的网址 例如我们有
  • 通过 DispatchGroup 与 DispatchQueue 访问主队列

    我在一个在后台线程上运行的类中使用 DispatchGroup 偶尔需要更新UI 所以调用如下代码 dispatchGroup notify queue main self delegate moveTo sender self locat
  • 在Spring Boot MVC中添加ShallowEtagHeaderFilter

    我正在尝试调整我的应用程序配置以设置 ETag 支持 我刚刚检查过this https stackoverflow com questions 26151057 add a servlet filter in a spring boot a
  • 如何更改Xamarin菜单栏中的后退按钮?

    这就是我所拥有的 这就是我想要得到的 如果导航堆栈中没有页面 则标题图标将位于左上角 否则将有后退箭头和 后退 文本 我没有找到任何自定义它的选项 有可能吗 如果您使用的话 您可以将箭头更改为汉堡包图标MasterPage在导航页面内 De
  • 如何允许外部访问私有 Azure DevOps NuGet 源

    情况如下 DevOps Org A维护私有 NuGet 提要 DevOps Org B需要在其 Pipelines 中使用上述 feed 中的包 目前的解决方案包括 添加用户U from Org B作为客人Org A具有利益相关者角色的 D
  • 恒等函数在哪里以及为什么有用?

    我明白为什么函数组合很重要 它允许从小而简单的函数构建大而复杂的函数 val f A gt B val g B gt C val h f andThen g compose f and g 该成分符合identity and 关联性 law
  • 产品图片不显示 (Woocommerce)

    我的产品图片出现 但当我点击进入产品页面时 图片被隐藏 只有当我点击时才会出现 某些产品会在其他浏览器上显示 某些产品仅在 Microsoft Edge 上显示 Edit The real problem to that was cloud
  • 替换 PHPUnit 方法 `withConsecutive` (在 PHPUnit 10 中废弃)

    作为方法withConsecutive将在 PHPUnit 10 中删除 在 9 6 中已弃用 我需要将此方法的所有出现替换为新代码 尝试寻找一些解决方案 但没有找到任何合理的解决方案 例如 我有一个代码 this gt personSer
  • Django-graphene 同一模型有多种类型

    我有一个相当大的graphene djangoAPI 为两个应用程序提供支持 我限制对某些字段的访问的第一个方法是拥有多个DjangoObjectTypes对于同一型号 并使用fields限制每种类型可以访问哪些字段 示例Organizat
  • 节点应用程序 docker 映像在本地运行并在 Amazon ECS 上失败

    该应用程序可以在本地正常部署和运行很长一段时间 没有出现任何问题 然而 在 Amazon ECS 上 它似乎总是在空闲运行大约 2 30 分钟后崩溃 怎么了 Dockerfile Set the node alpine base image
  • 计算 SPARQL 中的个体数量

    我对 SPARQL 完全陌生 我想计算这个本体中的参与者数量 http data linkedmdb org directory actor http data linkedmdb org directory actor 我尝试了以下方法
  • 为什么 Mercurial 合并时很笨?如何使拉取/合并更改变得更简单?

    我刚刚开始使用 Mercurial 我想我正在尝试做一些非常简单的事情 一些应该非常典型的事情 但我很困惑为什么它如此复杂 以及为什么它不能按应有的方式工作 国际海事组织 我与朋友共享一些存储库 他做了一些更改并检查了几个文件并推送它们 现
  • 如何在asp.net mvc中回发后清除字段?

    我想知道如何在 ASP NET MVC 回发后清除字段 就像现在 当发生验证错误时 字段会保留用户输入的内容 不过 这很好 当没有发生验证错误时 我希望清除所有字段并显示一条消息 所以现在我使用 ViewData 成功显示 但不确定如何清除
  • Kotlin 多平台:JobCancellationException:父作业已完成

    我尝试编写一个使用 ktor 的 kotlin 多平台库 android 和 ios 因此 我在 kotlins 协程方面遇到了一些问题 When writing tests I always get kotlinx coroutines
  • 通过构建管道将 ASP.NET 应用程序部署到 Azure 应用服务

    我继承了一个 ASP NET 4 7 2 应用程序 它在我的计算机上成功运行和启动 我现在尝试通过 Azure DevOps Pipeline 将其部署到 Azure 应用服务 为了尝试做到这一点 我创建了一个 Azure 构建管道 其中包
  • mod_rewrite 在 URL 中带有尾随句点

    我的 Apache 上有一个 RewriteRule 以使 URL 变得友好 RewriteRule log script php u 1 QSA 这使得http example com log 用户名 http example com l
  • gzipped Parquet 文件在 HDFS for Spark 中可拆分吗?

    在互联网上搜索和阅读有关此主题的答案时 我收到了令人困惑的消息 有人可以分享他们的经验吗 我知道 gzipped csv 不是这样的事实 但也许 Parquet 的文件内部结构是这样的 Parquet 与 csv 的情况完全不同 使用 GZ
  • 通过斯坦福解析器提取所有名词、形容词形式和文本

    我试图通过斯坦福解析器从给定文本中提取所有名词和形容词 我当前的尝试是在 Tree Object 的 getChildrenAsList 中使用模式匹配来定位以下内容 NN paper NN algorithm NN information
  • 从 vscode 的集成终端中打开新的集成终端的命令是什么?

    我正在尝试设置一系列任务 每个任务都需要一个终端 为此 我需要使用一个命令从以前的集成终端打开一个新的集成终端选项卡 有没有办法在vs code集成终端中做到这一点 在 mac 中我会使用 open a Terminal 或者类似的东西tt
  • 在 pika / RabbitMQ 中处理长时间运行的任务

    我们正在尝试建立一个基本的定向队列系统 其中生产者将生成多个任务 一个或多个消费者将一次获取一个任务 处理它并确认消息 问题是 处理过程可能需要 10 20 分钟 而且我们当时没有回复消息 导致服务器与我们断开连接 这是我们消费者的一些伪代