Django celery Worker 将实时状态和结果消息发送到前端

2023-11-25

在 django 应用程序中,我正在运行异步任务,并希望向用户显示进度、错误等。如果出现错误,则应将用户重定向到需要额外输入或执行某些操作才能解决问题的页面。从 celery 工作返回到前端的最佳通信方式是什么?

这是伪代码的基本结构:

# views.py
from tasks import run_task

def view_task():
    run_task.delay()
    return render(request, 'template.html')

# tasks.py
from compute_module import compute_fct

@shared_task
def run_task():
    result = compute_fct()

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
        handle_error()
    else:
        handle_succes()     

# compute_module
import pandas as pd

def compute_fct():
    # send message: status = loading file
    df = pd.read_csv('test.csv')
    # send message: status = computing
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}

我理想中想要的是:

  • compute_module.py模块使用 python 本机记录器。通过职责分离,我希望日志记录尽可能通用并使用标准的 python/django 记录器。但它们似乎并不是为了向前端发送消息而设计的。
  • celery 任务以某种方式处理日志,而不是在标准输出上显示它们,而是将它们重定向到推送器
  • 前端js显示并处理消息

芹菜工作人员和前端之间可能存在我不知道的标准通信方式。这种情况肯定经常发生,但我很惊讶它如此难以实现。在某种程度上,rabbitmq 消息队列或 aws sns 应该为此设计。以下是我查看过的资源,但感觉它们都效果不佳,但也许我只是感到困惑。

日志记录:这似乎更多的是在服务器端进行日志记录,而不是向用户发送消息

  • http://docs.celeryproject.org/en/latest/userguide/tasks.html#logging
  • https://docs.djangoproject.com/en/2.0/topics/logging/
  • http://oddbird.net/2017/04/17/async-notifications/
  • https://www.google.com/search?q=celery+worker+send+message+to+front+end

Celery cam 似乎是关于管理监控任务,而不是向用户发送消息

  • http://docs.celeryproject.org/en/latest/userguide/monitoring.html

我喜欢但我不想拥有的推手compute_module.py处理它。例如,我不想在内部进行任何 Pusher.com 集成compute_module.py。我想我可以传递一个已经实例化的推送对象,这样模块就可以推送消息,但我还是希望它是通用的

  • https://blog.pusher.com/improve-user-experience-app-real-time-progress-bar-tutorial/
  • https://blog.pusher.com/django-pusherable/

编辑:现在转移到 django-channels,效果很好,但比下面的解决方案更复杂。

以前的:

好的,下面是我现在如何解决它的伪代码。基本上我用https://pusher.com/docs/javascript_quick_start服务器端将实例化的对象传递给compute_module。一个缺点是推送消息是短暂的,所以我必须做一些额外的工作LogPusher将它们存储在数据库中,改天再做……

另外,在我的实际实现中,我通过$.post()ajax 调用$(document).ready()因为小任务完成得如此之快,用户永远不会看到推送消息,因为连接尚未建立(回到历史消息问题)。

我上面没有提到的另一种替代路线是https://channels.readthedocs.io/en/latest/

[编辑]另一个解决方案是服务器发送的事件其中有Django 实现,没测试过。但它看起来很适合单向更新,例如从服务器到客户端(相对于 websockets 双向)。你需要一个像这样的消息系统Redis 发布订阅获取服务器 sse 路由的更新。

通过推送器从 django 服务器进行前端更新:

# views.py
from tasks import run_task

def view_task():
    run_task.delay('event')
    return render(request, 'template.html', 'pusher_event':'event')

    
# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct

class LogPusher(object):
    def __init__(self, event):
        self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
                        key=settings.PUSHER_KEY,
                        secret=settings.PUSHER_SECRET,
                        cluster=settings.PUSHER_CLUSTER, ssl=True)
        self.event = event
        
    def send(self, data):
        self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))

@shared_task
def run_task(pusher_event):
    
    log_pusher = LogPusher(pusher_event)
    result = compute_fct(log_pusher)

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
            log_pusher.send('status':'error')
    else:
            log_pusher.send('status':'success')

            
# compute_module.py
import pandas as pd

def compute_fct(log_pusher):
    # send message: status = loading file
    log_pusher.send('status':'loading file')
    df = pd.read_csv('test.csv')
    # send message: status = computing
    log_pusher.send('status':'computing')
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}
        

# context_processors.py
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django
from django.conf import settings 

def pusher(request):
    return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }

        
# template.html
<script>
    
var pusher = new Pusher("{{PUSHER_KEY}}", {
  cluster: "{{PUSHER_CLUSTER}}",
  encrypted: true    
});

var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
    // process data
});

</script>
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Django celery Worker 将实时状态和结果消息发送到前端 的相关文章

随机推荐

  • 将字符串转换为上午/下午格式的日期和时间[重复]

    这个问题在这里已经有答案了 string 2014 04 25 17 03 13 使用 SimpleDateFormat 足以格式化吗 或者 否则我会转向任何新的 API Date date new Date string DateForm
  • Java HashMap 调整大小

    假设我们有一些代码 class WrongHashCode public int code 0 Override public int hashCode return code public class Rehashing public s
  • 尝试使用 KeyCloak 进行 NODE.JS 身份验证时出现“无效参数:redirect_uri”

    我正在使用 Node JS express 和一个名为的 NPMkeycloak连接连接到 keycloak 服务器 当我实现所描述的默认机制来保护路由时 app get about keycloak protect function re
  • 取消 openfiledialog 时如何防止异常?

    我的程序有一个按钮 单击该按钮会打开一个打开文件对话框来选择图片 private string ChoosePicture fDialog Title Select Picture fDialog Filter Image Files bm
  • PyQt:如何创建可滚动窗口

    我认为在 PyQt 中创建可滚动窗口应该更容易 我有一个超出窗口的标签列表 我想向下滚动以查看它们 目前 代码没有给我错误 但窗口没有出现 class Example QWidget def init self super init lay
  • 用python检测unicode私有使用区域字符

    在python 3中识别unicode专用字符的正确方法是什么 模块中没有任何明显相关的内容unicodedata 这使得查找角色名称和属性变得容易 一些背景 unicodedata name 给出 unicode 字符的名称 将引发Val
  • 参数化 PDO 查询和“LIMIT”子句 - 不起作用[重复]

    这个问题在这里已经有答案了 我有这样的查询 SELECT imageurl FROM entries WHERE thumbdl IS NULL LIMIT 10 它与 PDO 和 MySQL Workbench 完美配合 它根据我的需要返
  • 在 C# 中,我如何知道要捕获哪些异常?

    我已经养成了使用通用 catch 语句的习惯 并以通用方式处理这些异常 这是不好的做法吗 如果是这样 我如何知道可以抛出哪些特定异常以及捕获哪些异常 是的 除了在一些非常具体的情况下 这是不好的做法 我能想到的一种常见情况是 捕获所有异常并
  • C++ 控制台应用程序中的 PlaySound?

    已编辑 因此代码是正确的 感谢 a spot is 底部有新问题 所以我一直在玩控制台 因为我在那个级别 我们被要求制作我们的第一个 项目 进行评估 我已经完成了基本的应用程序 但我想让它活跃一点并添加一些声音 将从控制台播放的声音 这个测
  • 如何在 Symfony 4 中覆盖第三方包的资源?

    我使用 Symfony Flex 进行了全新的 Symfony 安装 新的框架属于下一个 Symfony 4 目录结构 下一个 我将覆盖一些资源例如来自外部包的模板 翻译等 我尝试为模板创建所有这些路径 首先 但没有任何效果 templat
  • 如何在 git checkout 上仅恢复修改过的文件?

    假设我有一个包含数百个文件的目录 我修改了其中一些 但后来我意识到我的修改很糟糕 如果我做 git checkout whole folder 然后所有的东西都会被再次检查 我必须重新编译所有的东西 有没有办法让结账只影响修改过的文件 或者
  • LazyLoadingEnabled 设置似乎在 EF 5 中不起作用

    我首先将 EF 模型与 POCO 实体和自定义 DbContext 一起使用 我的问题是设置LazyLoadingEnabled false不会影响任何内容 导航属性仍会加载 下面是我的简化示例 实体计划 一个程序可以是其他程序的一部分 n
  • 如何在 Postgres 中搜索字符串中是否存在整个单词

    我有一张带有一列的桌子field具有像三星手机这样的价值 我的问题是 如果我搜索字符串 Samsung 或 phone 如何获得这一行 如果我只给出 Sam 或 ph 作为搜索词 我不想要任何结果 我曾尝试使用 ILIKE 运算符 但如果我
  • 在 VB.NET 中订阅事件

    我正在尝试将一些 C 代码转换为 VB NET 我在 C 中有以下内容 有效 m switchImageTimer new DispatcherTimer m switchImageTimer Interval Interval m swi
  • Java:将大量数据序列化到单个文件

    我需要将小对象的大量数据 大约 2gig 序列化到单个文件中 以便稍后由另一个 Java 进程处理 性能很重要 谁能建议一个好的方法来实现这一目标 你有没有看过谷歌的协议缓冲区 听起来像是它的一个用例
  • AMD多核编程

    我想开始编写应用程序 C 它将利用额外的核心来执行需要执行大量计算并且其计算彼此独立的代码部分 我有以下处理器 x64 Family 15 Model 104 Stepping 2 Authentic AMD 1900 Mhz 在 Wind
  • JavaScript 中的睡眠 - 操作之间的延迟

    有没有办法可以在 JavaScript 执行另一个操作之前让其休眠 Example var a 1 3 Sleep 3 seconds before the next action here var b a 4 您可以使用setTimeou
  • 如何在 xunit/autofixture 中组合 PropertyData 和 AutoNSubstituteData 属性?

    我正在使用 AutoNSubstituteData 属性 发布在这里 AutoFixture xUnit net 和自动模拟 我想将其与 PropertyData 来自 xunit 扩展的属性 这是我的测试 public static IE
  • 在 Android Studio Java 中读取文本文件

    我有一个类 QuoteBank 需要使用扫描仪读取 txt 文件 但它给了我一个文件未找到异常 java 文件位于 应用程序 src main java nate marxBros QuoteBank java txt 文件位于 应用程序
  • Django celery Worker 将实时状态和结果消息发送到前端

    在 django 应用程序中 我正在运行异步任务 并希望向用户显示进度 错误等 如果出现错误 则应将用户重定向到需要额外输入或执行某些操作才能解决问题的页面 从 celery 工作返回到前端的最佳通信方式是什么 这是伪代码的基本结构 vie