Celery period_task 并行运行多次

2023-12-15

我有一些使用 Celery 线程的非常简单的周期代码;它只是打印“Pre”和“Post”并在中间休眠。它改编自这个 StackOverflow 问题 and 这个链接网站

from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task

def single_instance_task(timeout):
  def task_exc(func):
    def wrapper(*args, **kwargs):
        lock_id = "celery-single-instance-" + func.__name__
        acquire_lock = lambda: cache.add(lock_id, "true", timeout)
        release_lock = lambda: cache.delete(lock_id)
        if acquire_lock():
            try:
                func()
            finally:
                release_lock()
    return wrapper
  return task_exc

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
    lock_id = "lock"

    # cache.add fails if if the key already exists
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    if acquire_lock():
        try:
            print 'pre'
            sleep(20)
            print 'post'
        finally:
            release_lock()
        return
    print 'already in use...'

这段代码never prints 'already in use...';当我使用时也会出现同样的现象@single_instance_task装饰师。

你知道出了什么问题吗?

编辑:我简化了问题,以便它不会写入内存(使用全局或 django 缓存);我还是没见过'already in use...'


编辑:当我将以下代码添加到我的 Django settings.py 文件中时(通过将代码更改为https://docs.djangoproject.com/en/dev/topics/cache/一切都按希望进行,但仅当我使用端口 11211 时(奇怪的是,我的服务器位于端口 8000)

CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'LOCATION': [
            '127.0.0.1:11211'
        ]
    }
}

你如何运行 celeryd ?我不熟悉线程选项。

如果它正在运行多进程,那么就没有在工作人员之间共享内存的“全局”变量。

如果您想在所有工作人员之间共享一个计数器,那么我建议您使用cache.incr.

E.g.:

In [1]: from django.core.cache import cache

In [2]: cache.set('counter',0)

In [3]: cache.incr('counter')
Out[3]: 1

In [4]: cache.incr('counter')
Out[4]: 2

Update

如果你通过睡觉强迫你的任务重叠,会发生什么,例如:

print "Task on %r started" % (self,)
sleep(20)
print "Task on %r stopped" % (self,)

如果您没有因运行频率超过 20 秒而得到“已在使用中...”的信息,那么您就知道缓存的行为不符合预期。


另一个更新

您是否在 django 设置中设置了缓存后端?例如。内存缓存

如果没有,您可能正在使用虚拟缓存, 它实际上不做任何缓存,只是实现接口...这听起来像是您的问题的令人信服的原因。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Celery period_task 并行运行多次 的相关文章

  • Python、Tkinter、更改标签颜色

    有没有一种简单的方法来更改按钮中文本的颜色 I use button text input text here 更改按下后按钮文本的内容 是否存在类似的颜色变化 button color red Use the foreground设置按钮
  • 使用 openCV 对图像中的子图像进行通用检测

    免责声明 我是计算机视觉菜鸟 我看过很多关于如何在较大图像中查找特定子图像的堆栈溢出帖子 我的用例有点不同 因为我不希望它是具体的 而且我不确定如何做到这一点 如果可能的话 但我感觉应该如此 我有大量图像数据集 有时 其中一些图像是数据集的
  • 更改自动插入 tkinter 小部件的文本颜色

    我有一个文本框小部件 其中插入了三条消息 一条是开始消息 一条是结束消息 一条是在 单位 被摧毁时发出警报的消息 我希望开始和结束消息是黑色的 但被毁坏的消息 参见我在代码中评论的位置 插入小部件时颜色为红色 我不太确定如何去做这件事 我看
  • Python 多处理示例不起作用

    我正在尝试学习如何使用multiprocessing但我无法让它发挥作用 这是代码文档 http docs python org 2 library multiprocessing html from multiprocessing imp
  • 为 pandas 数据透视表中的每个值列定义 aggfunc

    试图生成具有多个 值 列的数据透视表 我知道我可以使用 aggfunc 按照我想要的方式聚合值 但是如果我不想对两列求和或求平均值 而是想要一列的总和 同时求另一列的平均值 该怎么办 那么使用 pandas 可以做到这一点吗 df pd D
  • 在循环中每次迭代开始时将变量重新分配给原始值(在循环之前定义)

    在Python中 你使用 在每次迭代开始时将变量重新分配给原始值 在循环之前定义 时 也就是说 original 1D o o o for i in range 0 3 new original 1D revert back to orig
  • SQLAlchemy 与 celery 的会话问题

    我已经为我们的网络应用程序安排了一些使用 celerybeat 重复执行的任务 该应用程序本身是使用金字塔网络框架构建的 使用 zopetransaction 扩展来管理会话 在 celery 中 我将该应用程序用作库 我正在使用函数重新定
  • NameError:名称“urllib”未定义”

    CODE import networkx as net from urllib request import urlopen def read lj friends g name fetch the friend list from Liv
  • Python:字符串不会转换为浮点数[重复]

    这个问题在这里已经有答案了 我几个小时前写了这个程序 while True print What would you like me to double line raw input gt if line done break else f
  • Python - 在窗口最小化或隐藏时使用 pywinauto 控制窗口

    我正在尝试做的事情 我正在尝试使用 pywinauto 在 python 中创建一个脚本 以在后台自动安装 notepad 隐藏或最小化 notepad 只是一个示例 因为我将编辑它以与其他软件一起使用 Problem 问题是我想在安装程序
  • 如何改变Python中特定打印字母的颜色?

    我正在尝试做一个简短的测验 并且想将错误答案显示为红色 欢迎来到我的测验 您想开始吗 是的 祝你好运 法国的首都是哪里 法国 随机答案不正确的答案 我正在尝试将其显示为红色 我的代码是 print Welcome to my Quiz be
  • 从 pygame 获取 numpy 数组

    我想通过 python 访问我的网络摄像头 不幸的是 由于网络摄像头的原因 openCV 无法工作 Pygame camera 使用以下代码就像魅力一样 from pygame import camera display camera in
  • 如何从没有结尾的管道中读取 python 中的 stdin

    当管道来自 打开 时 不知道正确的名称 我无法从 python 中的标准输入或管道读取数据 文件 我有作为例子管道测试 py import sys import time k 0 try for line in sys stdin k k
  • glpk.LPX 向后兼容性?

    较新版本的glpk没有LPXapi 旧包需要它 我如何使用旧包 例如COBRA http opencobra sourceforge net openCOBRA Welcome html 与较新版本的glpk 注意COBRA适用于 MATL
  • 用于运行可执行文件的python多线程进程

    我正在尝试将一个在 Windows 上运行可执行文件并管理文本输出文件的 python 脚本升级到使用多线程进程的版本 以便我可以利用多个核心 我有四个独立版本的可执行文件 每个线程都知道要访问它们 这部分工作正常 我遇到问题的地方是当它们
  • 从 Python 中的类元信息对 __init__ 函数进行类型提示

    我想做的是复制什么SQLAlchemy确实 以其DeclarativeMeta班级 有了这段代码 from sqlalchemy import Column Integer String from sqlalchemy ext declar
  • 使用基于正则表达式的部分匹配来选择 Pandas 数据帧的子数据帧

    我有一个 Pandas 数据框 它有两列 一列 进程参数 列 包含字符串 另一列 值 列 包含相应的浮点值 我需要过滤出部分匹配列 过程参数 中的一组键的子数据帧 并提取与这些键匹配的数据帧的两列 df pd DataFrame Proce
  • 在 Python 类中动态定义实例字段

    我是 Python 新手 主要从事 Java 编程 我目前正在思考Python中的类是如何实例化的 我明白那个 init 就像Java中的构造函数 然而 有时 python 类没有 init 方法 在这种情况下我假设有一个默认构造函数 就像
  • Python 分析:“‘select.poll’对象的‘poll’方法”是什么?

    我已经使用 python 分析了我的 python 代码cProfile模块并得到以下结果 ncalls tottime percall cumtime percall filename lineno function 13937860 9
  • Pandas 与 Numpy 数据帧

    看这几行代码 df2 df copy df2 1 df 1 df 1 values 1 df2 ix 0 0 我们的教练说我们需要使用 values属性来访问底层的 numpy 数组 否则我们的代码将无法工作 我知道 pandas Data

随机推荐