Flask 或 Pyramid 中的简单网络 UDP 监听

2024-05-23

我需要创建一个 Web 应用程序来显示通过定期传入 UDP 数据包提供的数据。该站点可能位于 Flask(可能是 Pyramid)中,部署在 Nginx 下。如何创建一个非常简单的后台任务(基本上只是 socket.recv())来侦听任何传入数据包,并将数据推送到全局可访问列表中?

我可以简单地从 main() 生成一个线程来执行此操作,还是需要使用 Celery 或 PyRes 之类的东西?

感谢您的任何指导。


你必须使用芹菜,但你很幸运,因为已经有一个集成了 celery 的 Flask 扩展 https://flask.palletsprojects.com/en/2.0.x/patterns/celery/。你必须pip install flask, pip install flask-celery, pip install redis并且您的系统上需要有一个 redis 服务器。

import socket, select, Queue

from flask import Flask
from celery import Celery


def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)
socket_queue = Queue.Queue()


@celery.task()
def listen_to_udp():
    """
    This code was taken from 
    https://stackoverflow.com/questions/9969259/python-raw-socket-listening-for-udp-packets-only-half-of-the-packets-received
    """
    s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s1.bind(('0.0.0.0', 1337))
    s2 = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
    s2.bind(('0.0.0.0', 1337))
    while True:
        r, w, x = select.select([s1, s2], [], [])
        for i in r:
            socket_queue.put((i, i.recvfrom(131072)))

@app.route("/")
def test_home():
    listen_to_udp.delay()
    print(socket_queue.get())

if __name__ == "__main__":
    #run install.py to install dependencies and create the database
    app.run(host="0.0.0.0", port=5000, debug=True)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flask 或 Pyramid 中的简单网络 UDP 监听 的相关文章

  • 类的 IPython 表示

    我正在使用我创建的模块尝试 IPython 但它没有显示类对象的实际表示 相反 它显示类似的内容 TheClass module TheClass name I heavily在这个模块中使用元类 我有真正有意义的类表示 应该向用户显示 是
  • Python - 比较同一字典中的值

    我有一本字典 d Trump MAGA FollowTheMoney Clinton dems Clinton Stein FollowTheMoney Atlanta 我想删除字符串列表中的重复字符串 该字符串是键的值 对于这个例子 期望
  • 计算另一个字符串中多个字符串的出现次数

    在 Python 2 7 中 给定以下字符串 Spot是一只棕色的狗 斑点有棕色的头发 斑点的头发是棕色的 查找字符串中 Spot brown 和 hair 总数的最佳方法是什么 在示例中 它将返回 8 我正在寻找类似的东西string c
  • Gunicorn 工作人员无论如何都会超时

    我正在尝试通过gunicorn运行一个简单的烧瓶应用程序 但是无论我做什么 我的工作人员都会超时 无论是否有针对应用程序的活动 工作人员在我设置任何内容后总是会超时timeout值到 是什么导致它们超时 当我发出请求时 请求成功通过 但工作
  • 在 Django Admin 中调整字段大小

    在管理上添加或编辑条目时 Django 倾向于填充水平空间 但在某些情况下 当编辑 8 个字符宽的日期字段或 6 或 8 个字符的 CharField 时 这确实是一种空间浪费 字符宽 然后编辑框最多可容纳 15 或 20 个字符 我如何告
  • Python 3d 绘图设置固定色阶

    我正在尝试绘制两个 3d 数组 第一个数组的 z 值在范围内 0 15 0 15 第二个来自 0 001 0 001 当我绘图时 色标自动遵循数据范围 如何设置自定义比例 我不想看到 0 001 的浅色 而应该看到 0 15 的浅色 如何修
  • PyQt 使用 ctrl+Enter 触发按钮

    我正在尝试在我的应用程序中触发 确定 按钮 我当前尝试的代码是这样的 self okPushButton setShortcut ctrl Enter 然而 它不起作用 这是有道理的 我尝试查找一些按键序列here http ftp ics
  • Pycharm 在 os.path 连接上出现“未解析的引用”

    将pycharm升级到2018 1 并将python升级到3 6 5后 pycharm报告 未解析的引用 join 最新版本的 pycharm 不会显示以下行的任何警告 from os path import join expanduser
  • Python 3:将字符串转换为变量[重复]

    这个问题在这里已经有答案了 我正在从 txt 文件读取文本 并且需要使用我读取的数据之一作为类实例的变量 class Sports def init self players 0 location name self players pla
  • 未知错误:Chrome 无法启动:异常退出

    当我使用 chromedriver 对 Selenium 运行测试时 出现此错误 selenium common exceptions WebDriverException Message unknown error Chrome fail
  • 嵌套作用域和 Lambda

    def funct x 4 action lambda n x n return action x funct print x 2 prints 16 我不太明白为什么2会自动分配给n n是返回的匿名函数的参数funct 完全等价的定义fu
  • 尽管我已在 python ctypes 中设置了信号处理程序,但并未调用它

    我尝试过使用 sigaction 和 ctypes 设置信号处理程序 我知道它可以与python中的信号模块一起使用 但我想尝试学习 当我向该进程发送 SIGTERM 时 但它没有调用我设置的处理程序 只打印 终止 为什么它不调用处理程序
  • Django REST Framework - CurrentUserDefault 使用

    我正在尝试使用CurrentUserDefault一个序列化器的类 user serializers HiddenField default serializers CurrentUserDefault 文档说 为了使用它 请求 必须作为
  • 将 Matlab 的 datenum 格式转换为 Python

    我刚刚开始从 Matlab 迁移到 Python 2 7 在读取 mat 文件时遇到一些问题 时间信息以 Matlab 的日期数字格式存储 对于那些不熟悉它的人 日期序列号将日历日期表示为自固定基准日期以来已经过去的天数 在 MATLAB
  • Python GTK+ 画布

    我目前正在通过 PyGobject 学习 GTK 需要画布之类的东西 我已经搜索了文档 发现两个小部件似乎可以完成这项工作 GtkDrawingArea 和 GtkLayout 我需要一些基本函数 如 fillrect 或 drawline
  • 找到一个数字所属的一组范围

    我有一个 200k 行的数字范围列表 例如开始位置 停止位置 该列表包括除了非重叠的重叠之外的所有类型的重叠 列表看起来像这样 3 5 10 30 15 25 5 15 25 35 我需要找到给定数字所属的范围 并对 100k 个数字重复该
  • 带有 LSTM 的 GridSearchCV/RandomizedSearchCV

    我一直在尝试通过 RandomizedSearchCV 调整 LSTM 的超参数 我的代码如下 X train X train reshape X train shape 0 1 X train shape 1 X test X test
  • 制作一份 Python 文档的 PDF 文件

    Python 官方网站提供 PDF 文档下载 但它们是按章节分隔的 我下载了源代码并构建了 PDF 文档 这些文档也是单独的 PDF 我怎么能够从源代码中的 Makefile 构建一个 PDF 文件 我认为这样阅读起来会更方便 如果连接单独
  • pandas.read_csv 将列名移动一倍

    我正在使用位于的 ALL zip 文件here http www fec gov disclosurep PDownload do 我的目标是用它创建一个 pandas DataFrame 但是 如果我跑 data pd read csv
  • pytest找不到模块[重复]

    这个问题在这里已经有答案了 我正在关注pytest 良好实践 https docs pytest org en latest explanation goodpractices html test discovery或者至少我认为我是 但是

随机推荐

  • JPA Criteria查询Path.get left join是否可以

    我对 JPA 标准有疑问 这是我的 JPA 标准查询 CriteriaBuilder criteriaBuilder getEm getCriteriaBuilder CriteriaQuery
  • 有没有办法从 PhantomJS 的键盘读取用户输入?

    我使用 PhantomJS 登录网站 必须手动输入验证码 如何将验证码图像保存到磁盘 然后在 PhantomJS 控制台中手动输入验证码 我遇到了同样的问题 只需将系统模块与 page render 和一些传递给 page evaluate
  • 应用程序在加载 xml 布局文件的主线程中做了太多工作

    我正在制作一个 9x9 数独网格 其中 81 个单元格本身就是一个 3x3 网格 单个细胞看起来像这样 1 2 3 4 5 6 7 8 9 每个数字代表该单元格的铅笔注释 我有一个名为 cell layout xml 的文件 表示这种 3x
  • Awk - 计算两个文件之间的每个唯一值和匹配值

    我有两个文件 首先 我尝试获取第 4 列中每个唯一字段的计数 然后匹配第二个文件的第二列中的唯一字段值 File1 第 4 列的每个唯一值和 File2 第 2 列包含我需要在两个文件之间匹配的值 所以本质上 我试图 gt 如果 file2
  • 数据未保存为加密数据 django

    到目前为止 我尝试了超过 6 个插件 但现在很沮丧 现在用的是这个密码学 https pypi org project django cryptography 一切都很好并相应地完成 但是当我像这样在模型管理器中保存数据时 def crea
  • 如何使用 sed 将空格替换为 \(space)?

    当我使用 sed 将所有空格替换为 X 时 该命令有效 命令为 sed s X g filelist tmp 但是 当我尝试用 space 替换所有出现的空格时 代码是 sed s g filelist tmp 这不起作用 我究竟做错了什么
  • 如何为非重复计数创建单独的度量值组

    我正在向多维数据集添加不同的订单计数度量 我必须将其放在单独的度量组中以获得更好的性能 当我单击 新度量组 按钮时 会出现一个对话框 从源视图中选择一个表 我的 Facts sales 表不在列表中 因为该表已被其他度量值组 Facts s
  • 如何以最短的停机时间移交 TCP 侦听套接字?

    虽然这个问题被标记为 EventMachine 任何语言的通用 BSD 套接字解决方案也非常受欢迎 一些背景 我有一个应用程序正在侦听 TCP 套接字 它通过常规的 System V 风格的 init 脚本启动和关闭 我的问题是它需要一些时
  • 除了标准/渐进之外,第三种JPEG压缩:按通道加载?

    这个问题可能是一个 开放式问题 你们中的许多人可能渴望结束它 但请不要这样做 让我解释 众所周知 JPEG有两种压缩方式 至少在Photoshop保存对话框中 优化 图像加载有点像逐行加载 渐进式 图像首先像马赛克一样加载 逐渐更好 直到原
  • 将 PHP 中的 openssl AES 转换为 Python AES

    我有一个 php 文件 如下所示 encryption encoded key c7e1wJFz PBwQix80D1MbIwwOmOceZOzFGoidzDkF5g function my encrypt data key encrypt
  • 函数式语言中的部分求值和函数内联有什么区别?

    我知道 函数内联就是用函数定义代替函数调用 部分评估是在编译时评估程序的已知 静态 部分 在 C 等命令式语言中 两者之间存在区别 其中运算符与函数不同 但是 在像 Haskell 这样的函数式语言 其中运算符也是函数 中 两者之间有什么区
  • Jmeter 和 Bitbucket 服务器负载测试

    我是 Jmeter 的新手 我有一个本地托管的 Bitbucket 服务器 有时 当 Bamboo plan 触发并发 git 克隆操作时 会发现 Bitbucket 服务器变得缓慢 无响应 我必须重新启动服务 我想通过对另一个本地创建的
  • 更高效的 LINQ 查询

    有人可以帮我将此查询循环变成高效的 Linq 查询吗 我将其加载到 TreeView 中 因此必须附加每个项目 包含也非常低效 延迟加载项目也不起作用 事实上 这个查询访问数据库的次数比应有的要多 public IQueryable
  • 禁用外部点击时关闭模式

    我正在制作一些使用模式的博客物质化 但我的模态 onclick 外部和错误数据有问题 这是我的代码 main js function changepassword var user userlog val var content conte
  • 致命错误:在 flutter 中找不到“Flutter/Flutter.h”文件

    这是错误 在文件中包含来自 Users chetan pub cache hosted pub dartlang org webview flutter 1 0 7 ios Classes JavaScriptChannelHandler
  • LLVM 互操作性(如 JVM 或 .Net)- 可以吗?

    我最近尝试了一些不同的 LLVM 前端 例如 Clang C Familiy LDC2 D Terra 所有这些语言都可以编译成 LLVM IR 有点可读 和 LLVM IR Bitcode 那么现阶段他们都处于同一 水平 对吗 我的问题是
  • 将值设置为输入字段时,西班牙语文本无法正确显示

    我正在尝试将西班牙语文本设置到输入字段 但它似乎没有正确显示 但是 如果相同的文本位于 div 内部或已设置为输入字段的值 则该文本可以正常显示 HTML div Cuenta de Ahorros Persona F iacute sic
  • 不同类型的 C++ 对称二元运算符

    我正在学习 C 我想知道是否可以深入了解创建适用于两种不同类型实例的二元运算符的首选方法 这是我为了说明我的担忧而制作的一个例子 class A class B class A private int x public A int x in
  • golang.org 包和标准库之间的区别

    我使用 go 已经有一段时间了 我注意到 Go 标准库 和 golang org x 之间存在重复的包 我的问题是 为什么它们被释放两次 在这两者中 我应该使用哪一个 更新的 规范的等 到目前为止我注意到的一些示例包已发布两次 golang
  • Flask 或 Pyramid 中的简单网络 UDP 监听

    我需要创建一个 Web 应用程序来显示通过定期传入 UDP 数据包提供的数据 该站点可能位于 Flask 可能是 Pyramid 中 部署在 Nginx 下 如何创建一个非常简单的后台任务 基本上只是 socket recv 来侦听任何传入