根据 Celery 任务状态更新 Django 模型字段

2024-01-14

在我的模型中,我有一个status默认值为“处理”的字段。在 Django 管理界面中,用户单击“保存”按钮后,表单输入将传递给仅休眠 30 秒的 celery 任务。

30 秒后,我该如何:

  1. 判断celery任务是否成功?
  2. 更新模型的status字段从“正在处理”到实际状态(例如:已完成、失败?

模型.py

from django.db import models

class Scorecard(models.Model):

    name = models.CharField(max_length=100, unique=True)
    status = models.CharField(max_length=20, default='Processing')

    def __str__(self):
        return self.name

admin.py

from django.contrib import admin
from scorecards.models import Scorecard
from scorecards.tasks import get_report

class ScorecardAdmin(admin.ModelAdmin):
    list_display = ['name', 'status']

    def save_model(self, request, obj, form, change):
        if form.is_valid():
            data = form.cleaned_data
            name = data['name']
            get_report.delay(name)
        super().save_model(request, obj, form, change)

admin.site.register(Scorecard, ScorecardAdmin)

tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
from time import sleep

@shared_task
def get_report(name):
    sleep(30)

celery 任务的实时状态更新status每 x 时间间隔进行一次字段就很好了,但现在我只是很好奇如何做到这一点。


我还没有弄清楚实时状态,但确实设法在任务完成后更改状态。

这是下面的主要部分。理解为什么这是有效的关键原因是我正在开始一个芹菜工人--pool=solo像这样:

celery -A scorecard worker --pool=solo -l info

这是一个单线程执行池(这对于我当前的目的来说很好),但这意味着它将处理第一个任务get_report(name),完成后,处理set_task_status(id)它检查结果的状态并将状态字段设置为实际状态。

模型.py

class Scorecard(models.Model):
    ....
    task_id = models.CharField(max_length=50)

admin.py

class ScorecardAdmin(admin.ModelAdmin):
    ...
            result = get_report.delay(name)
            set_task_status.delay(result.task_id)
    ...

tasks.py

@shared_task
def get_report(name):
    sleep(30)

@shared_task
def set_task_status(id):
    instance = Scorecard.objects.get(task_id=id)
    task_status = AsyncResult(id)
    instance.status = task_status.status
    instance.save()

这是我到目前为止所想到的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

根据 Celery 任务状态更新 Django 模型字段 的相关文章

随机推荐