本地主机上的 Django/Celery 多个队列 - 路由不起作用

2024-02-14

我跟着芹菜docs http://celery.readthedocs.org/en/latest/userguide/routing.html#manual-routing在我的开发机器上定义 2 个队列。

我的芹菜设置:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}

我在项目的 virtualenv 中打开了两个终端窗口,并运行以下命令:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker

我得到的是所有任务都由两个队列处理。

我的目标是拥有一个队列来仅处理中定义的一项任务CELERY_ROUTES和默认队列来处理所有其他任务。

我也关注了这个所以问题 https://stackoverflow.com/questions/10079816/route-celery-task-to-specific-queue, rabbitmqctl list_queues回报celery 0,并运行rabbitmqctl list_bindings回报exchange celery queue celery []两次。重新启动兔子服务器没有改变任何东西。


好吧,所以我想通了。以下是我的整个设置、设置以及如何运行 celery,对于那些可能想知道与我的问题相同的事情的人。

Settings

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'feeds',
        'routing_key': 'long_tasks',
    },
}

如何运行芹菜?

终端 - 选项卡 1:

celery -A proj worker -Q default -l debug -n default_worker

这将启动第一个工作进程来消耗默认队列中的任务。笔记!-n default_worker对于第一个工作人员来说不是必须的,但如果您有任何其他启动并运行的 celery 实例,则这是必须的。环境-n worker_name是相同的--hostname=default@%h.

终端 - 选项卡 2:

celery -A proj worker -Q feeds -l debug -n feeds_worker

这将启动第二个工作人员,从提要队列中消费任务。注意-n feeds_worker,如果你正在运行-l debug(日志级别=调试),您将看到两个工作人员正在它们之间同步。

终端 - 选项卡 3:

celery -A proj beat -l debug

这将开始节拍,根据您的计划执行任务CELERYBEAT_SCHEDULE。 我不必改变任务,或者CELERYBEAT_SCHEDULE.

例如,这就是我的样子CELERYBEAT_SCHEDULE对于应该进入 feeds 队列的任务:

CELERYBEAT_SCHEDULE = {
    ...
    'update_feeds': {
        'task': 'arena.social.tasks.Update',
        'schedule': crontab(minute='*/6'),
    },
    ...
}

如您所见,无需添加'options': {'routing_key': 'long_tasks'}或指定它应该进入哪个队列。另外,如果您想知道为什么Update是大写的,因为它是一个自定义任务,被定义为celery.Task.

更新芹菜5.0+

Celery 自版本 5 以来进行了一些更改,以下是任务路由的更新设置。

如何创建队列?

Celery 可以自动创建队列。它非常适合简单的情况,其中 celery 的默认路由值就可以了。

task_create_missing_queues=True或者,如果您使用 django 设置并且您正在为所有 celery 配置命名空间CELERY_ key, CELERY_TASK_CREATE_MISSING_QUEUES=True。请注意,它默认处于打开状态。

自动计划任务路由

配置 celery 应用程序后:

celery_app.conf.beat_schedule = {
  "some_scheduled_task": {
    "task": "module.path.some_task",
    "schedule": crontab(minute="*/10"),
    "options": {"queue": "queue1"}
  }
}

自动任务路由

Celery 应用程序仍然需要先配置,然后:

app.conf.task_routes = {
  "module.path.task2": {"queue": "queue2"},
}

手动分配任务

如果您想动态路由任务,则在发送任务时指定队列:

from module import task

def do_work():
  # do some work and launch the task
  task.apply_async(args=(arg1, arg2), queue="queue3")

重新路由的更多详细信息可以在这里找到:https://docs.celeryproject.org/en/stable/userguide/routing.html https://docs.celeryproject.org/en/stable/userguide/routing.html

关于此处的调用任务:https://docs.celeryproject.org/en/stable/userguide/calling.html https://docs.celeryproject.org/en/stable/userguide/calling.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

本地主机上的 Django/Celery 多个队列 - 路由不起作用 的相关文章

  • 如何同时运行多个功能[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有以下代码 my func1 my func2 my func3 my func4 my func5 是否可以同时计算函数的数据 而
  • 从内存地址创建python对象(使用gi.repository)

    有时我需要调用仅存在于 C 中的 gtk gobject 函数 但返回一个具有 python 包装器的对象 之前我使用过基于 ctypes 的解决方案 效果很好 现在我从 PyGtk import gtk 切换到 GObject intro
  • Python Requests 库重定向新 url

    我一直在浏览 Python 请求文档 但看不到我想要实现的任何功能 在我的脚本中我设置allow redirects True 我想知道该页面是否已重定向到其他内容 新的 URL 是什么 例如 如果起始 URL 为 www google c
  • NSUserNotificationCenter.defaultUserNotificationCenter() 使用 PyInstaller 返回 None

    我正在尝试将通知发送到通知中心 Mac OSX 我正在使用 PyObjC 绑定来使用我们的 python 应用程序中的 cocoa api 我正在使用以下代码片段 import Foundation import objc NSUserNo
  • 会话数据库表清理

    该表是否需要清除或者由 Django 自动处理 Django 不提供自动清除功能 然而 有一个方便的命令可以帮助您手动完成此操作 Django 文档 清除会话存储 https docs djangoproject com en dev to
  • 如何知道python运行脚本的路径?

    sys arg 0 给我 python 脚本 例如 python hello py 返回 sys arg 0 的 hello py 但我需要知道 hello py 位于完整路径中的位置 我怎样才能用Python做到这一点 os path a
  • 列表推导式和 for 循环中的 Lambda 表达式[重复]

    这个问题在这里已经有答案了 我想要一个 lambda 列表 作为一些繁重计算的缓存 并注意到这一点 gt gt gt j for j in lambda i for i in range 10 9 9 9 9 9 9 9 9 9 9 Alt
  • multiprocessing.Queue 中的 ctx 参数

    我正在尝试使用 multiprocessing Queue 模块中的队列 实施 https docs python org 3 4 library multiprocessing html exchang objects Between p
  • Python 在哪些系统上不使用 IEEE-754 双精度浮点数

    Python 对 IEEE 754 浮点运算进行了各种引用 但不保证1 https docs python org 3 tutorial floatingpoint html 2 https pythondev readthedocs io
  • 使用 python 脚本更改 shell 中的工作目录

    我想实现一个用户态命令 它将采用其参数之一 路径 并将目录更改为该目录 程序完成后 我希望 shell 位于该目录中 所以我想实施cd命令 但需要外部程序 可以在 python 脚本中完成还是我必须编写 bash 包装器 Example t
  • 将图与热图(可能是对数)配对?

    How to create a pair plot in Python like the following but with heat maps instead of points or instead of a hex bin plot
  • Django - 在长时间处理期间显示加载消息

    我怎样才能显示请稍等从 django 视图加载消息 我有一个 Django 视图 需要花费大量时间对大型数据集执行计算 当进程加载时 我想向用户呈现一条反馈消息 例如 旋转加载动画 gif 或类似消息 在尝试了布兰登和穆拉特提出的两种不同方
  • 数据损坏 C++ 和 Python 之间的管道

    我正在编写一些代码 从 Python 获取二进制数据 将其通过管道传输到 C 对数据进行一些处理 在本例中计算互信息度量 然后将结果通过管道传输回 Python 在测试时 我发现如果我发送的数据是一组尺寸小于 1500 X 1500 的 2
  • 将 Django 中的所有视图限制为经过身份验证的用户

    我是 Django 新手 我正在开发一个项目 该项目有一个登录页面作为其索引和一个注册页面 其余页面都必须仅限于登录用户 如果未经身份验证的用户尝试访问这些页面 则必须将他 她重定向到登录页面 我看到 login required装饰器会将
  • 如何检测一个二维数组是否在另一个二维数组内?

    因此 在堆栈溢出成员的帮助下 我得到了以下代码 data needle s which is a png image base64 code goes here decoded data decode base64 f cStringIO
  • 如何在引发异常时将变量传递给异常并在异常时检索它?

    现在我只有一个空白的异常类 我想知道如何在引发变量时给它一个变量 然后在 try except 中处理它时检索该变量 class ExampleException Exception pass 为其构造函数提供一个参数 将其存储为属性 然后
  • tf.print() vs Python print vs tensor.eval()

    看来在Tensorflow中 至少有三种方法可以打印出张量的值 我一直在读here https www freecodecamp org news debugging tensorflow a starter e6668ce72617 an
  • PyQt5按钮lambda变量变成布尔值[重复]

    这个问题在这里已经有答案了 当我运行下面的代码时 它显示如下 为什么 x 不是 x 而是变成布尔值 这种情况仅发生在传递到用 lambda 调用的函数中的第一个参数上 错误的 y home me model some file from P
  • 如何在 Django 中执行 SELECT MAX?

    我有一个对象列表 如何运行查询来给出字段的最大值 我正在使用这段代码 def get best argument self try arg self argument set order by rating 0 details except
  • 如何有效地比较 pandas DataFrame 中的行?

    我有一个 pandas 数据框 其中包含雷击记录以及时间戳和全球位置 格式如下 Index Date Time Lat Lon Good fix 0 1 20160101 00 00 00 9962692 7 1961 60 7604 1

随机推荐