在数据库中保存 celery 任务(用于重新运行)

2023-12-19

我们的工作流程目前是围绕旧版本的 celery 构建的,因此请记住,事情已经不是最佳的。我们需要运行一个任务并将该任务运行的记录保存在数据库中。如果该任务失败或挂起(这种情况经常发生),我们希望重新运行,就像第一次运行一样。但这不应该自动发生。它需要根据故障的性质手动触发,并且需要将结果记录在数据库中以做出决定(通过前端)。

我们如何在数据库中保存任务的完整记录,以便后续进程可以获取该记录并运行新的相同任务?当前的实现保存了路径@task数据库中的装饰函数作为TaskInfo模型。当任务需要重新运行时,我们有一个get_task()方法上的TaskInfo从数据库获取路径的模型,使用以下命令导入它getattr,还有另一个rerun()再次运行任务的方法*args, **kwargs(也保存在数据库中)。

像这样(这些是方法TaskInfo模型实例):

def get_task(self):
    """Returns the task's decorated function, which can be delayed."""
    module_name, object_name = self.path.rsplit('.', 1)
    module = import_module(module_name)
    task = getattr(module, object_name)
    if inspect.isclass(task):
        task = task()
    # task = current_app.tasks[self.path]
    return task

 def rerun(self):
    """Re-run the task, and replace this one.

    - A new task is scheduled to run.
    - The new task's TaskInfo has the same parent as this TaskInfo.
    - This TaskInfo is deleted.
    """
    args, kwargs = self.get_arguments()
    celery_task = self.get_task()
    celery_task.delay(*args, **kwargs)
    defaults = {
        'path': self.path,
        'status': Status.PENDING,
        'timestamp': timezone.now(),
        'args': args,
        'kwargs': kwargs,
        'parent': self.parent,
    }
    TaskInfo.objects.update_or_create(task_id=celery_task.id, defaults=defaults)
    self.delete()

必须有一个更干净的解决方案来将任务保存在数据库中以便稍后重新运行,对吧?


最新版本的 Celery (4.4.0) 包含一个参数extended_result。您可以将其设置为 True,然后该表(它的名称为celery_taskmeta默认情况下)在Result Backend Database将存储args and kwargs的任务。

这是一个演示:

app = Celery('test_result_backend')

app.conf.update(
    broker_url='redis://localhost:6379/10',
    result_backend='db+mysql://root:passwd@localhost/celery_toys',
    result_extended=True
)


@app.task(bind=True, name='add')
def add(self, x, y): 
    self.request.task_name = 'add'  # For saving the task name.
    time.sleep(5)
    return x + y 

通过MySQL中记录的任务信息,您可以轻松地重新运行您的任务。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在数据库中保存 celery 任务(用于重新运行) 的相关文章

  • “unicode”对象没有属性“_meta”

    我正在尝试创建一个视图 用户在其中从下拉菜单中选择一个选项 提交它 然后返回一些数据 具体来说 他们将从数据库中的模型中进行选择 并返回该类的所有实例 我使用 django tables2 输出数据 以便可以排序 但这是我的症结所在 Vie
  • django.db.utils.IntegrityError:NOT NULL 约束失败:app.area_id

    当我跑步时 manage py migrate 发生错误django db utils IntegrityError NOT NULL constraint failed app area id models py is class Are
  • Django 星级评定系统和 AJAX

    我正在尝试在 Django 网站上实现星级评级系统 在我的模型中存储评级是排序的 就像在页面上显示分数一样 但我希望用户能够对页面进行评分 基本上从 1 到 5 而无需刷新或更改页面 我发现了以下内容 并且喜欢这里明星的风格 http jv
  • Django 中的行级权限

    有没有办法在 django 中进行行级权限 我以为没有 但只是在文档中注意到了这一点 权限不仅可以按对象类型设置 还可以按对象设置 具体的对象实例 通过使用 has add permission 提供了 has change permiss
  • 自定义 django-rest-framework-simplejwt 的 JWT 响应

    我正在设置 Django 来发送 JWT 响应而不是视图 我尝试使用 django rest framework simplejwt 这个框架中提供了一个函数TokenObtainPairView as view 返回一对 jwt 我需要使
  • HTML - 使用 JS 根据值更改文本颜色

    我正在使用 Django 创建一个以 HTML 形式显示的表格 我想当数字为负数时将数字的颜色更改为红色 当数字为正数时将数字的颜色更改为绿色 我知道我需要使用 JS 来实现这一点 但我无法让它工作 任何帮助将不胜感激 这是我的 Djang
  • 本地 React 前端、Django REST Framework 后端(在 CORS 下访问 CSRF cookie 时出现问题)

    我正在创建一个带有 React 前端和 Django REST Framework 后端的 Web 应用程序 由于某些情况 我必须在本地开发 React 前端 而后端服务器位于远程位置 后端服务器要求我在登录后的每个 POST 中使用 CS
  • Django Rest Framework 分页设置 - 内容范围

    6 30 15 我怎样才能让这个问题变得更好并且对其他人更有帮助 反馈会有帮助 谢谢 我使用 DRF 作为 Dojo Dgrid Web 应用程序的服务器端 Dojo 需要来自服务器的内容范围或范围响应 目前它不发送任何内容 因此 dgri
  • 如何在 django 模板中将名称反转为绝对 url?

    url url name 给出一个相对名称 我怎样才能做类似的事情 absolute url url name 这样它就会返回带基数的 url 包括端口 如果存在 有不同的解决方案 编写您自己的模板标签并使用 HttpRequest bui
  • Tastypie 与 application/x-www-form-urlencoded

    我有点难以弄清楚下一步应该做什么 我正在使用 tastypie 为我的 Web 应用程序创建 API 从另一个应用程序 特别是 ifbyphone com 我收到一个没有标题的 POST 如下所示 post data http myapp
  • 使用 nginx 在云上部署 django 和 React

    我有一个 digitalocean 服务器 并且已经使用 Gunicorn 和 nginx 部署了 Django 后端服务器 如何在同一台服务器上部署 React 应用程序 您可以构建 React 应用程序并使用 Nginx 提供其静态文件
  • Django:管理中的 AJAX ManyToManyField

    我要显示ManyToManyFields 在 admin 中就像filter horizontal确实如此 但会在用户在过滤器字段中键入内容时填充选项 有很多选项 一次性加载它们需要很多时间 I found django ajax 过滤字段
  • 如何发送正确的授权标头以进行基本身份验证

    我正在尝试从 API 发布数据 但无法通过基本身份验证 I try ajax type POST url http theappurl com api v1 method data crossDomain true beforeSend f
  • 对原始模型进行预过滤后的相关对象的 Django 查询集

    给定一个模型的查询集 我想获取通过外键相关的另一个模型的查询集 采用 Django 项目文档的博客架构 class Blog models Model name models CharField max length 100 tagline
  • 模型字段的随机/非常量默认值?

    我有一个看起来像这样的模型 class SecretKey Model user ForeignKey User related name secret keys created DateTimeField auto now add Tru
  • Django REST序列化器:创建对象而不保存

    我已经开始使用 Django REST 框架 我想做的是使用一些 JSON 发布请求 从中创建一个 Django 模型对象 然后使用该对象而不保存它 我的 Django 模型称为 SearchRequest 我所拥有的是 api view
  • InterfaceError:连接已关闭(使用 django + celery + Scrapy)

    当我在 Celery 任务中使用 Scrapy 解析函数 有时可能需要 10 分钟 时 我得到了这个信息 我用 姜戈 1 6 5 django celery 3 1 16 芹菜 3 1 16 psycopg2 2 5 5 我也使用了psyc
  • ExpectedFailure 被计为错误而不是通过

    我在用着expectedFailure因为有一个我想记录的错误 我现在无法修复 但想将来再回来解决 我的理解expectedFailure是它会将测试计为通过 但在摘要中表示预期失败的数量为 x 类似于它如何处理跳过的 tets 但是 当我
  • 在 Django 中定期运行一个函数

    我在 Django 中编写一个应用程序来监视一组服务器中的某些服务 我希望定期更新观点 到目前为止 我已经研究过编写自定义管理命令 链接在这里 http docs djangoproject com en dev howto custom
  • 如何在 Django 中使用并发进程记录到单个文件而不使用独占锁

    给定一个在多个服务器上同时执行的 Django 应用程序 该应用程序如何记录到单个共享日志文件 在网络共享中 而不保持该文件以独占模式永久打开 当您想要利用日志流时 这种情况适用于 Windows Azure 网站上托管的 Django 应

随机推荐