观看 MongoDB 在 Python 中异步更改流

2024-03-04

我如何设置一个 python 服务来(异步)监视 mongodb 的更改流。

我能找到的一切mongodb.com https://www.mongodb.com/developer/quickstart/python-change-streams/ and pymongo 文档 https://pymongo.readthedocs.io/en/stable/api/pymongo/change_stream.html以下两种方法似乎还没有真正准备好投入生产:

访问 mongodb.com:

import os
import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('<YOUR-MONGO-CONNECT-STRING>')
change_stream = client.changestream.collection.watch()
for change in change_stream:
    print(dumps(change))

方法 pymongo 文档:

with db.collection.watch() as stream:
    while stream.alive:
        change = stream.try_next()
        print("Current resume token: %r" % (stream.resume_token,))
        if change is not None:
            print("Change document: %r" % (change,))
            continue
        time.sleep(10)

我考虑了一个使用 watch 函数作为事件循环回调的解决方案。 有人知道这个的实现吗?


获取所有 MongoDB 更新通知的简单方法如下:

import pymongo

client = pymongo.MongoClient("mongodb+srv://YOUR-MONGO-CONNECT-STRING" % (
            mongo_user, mongo_pass, mongo_db_name))

 option={ 'full_document':'updateLookup' }

 change_stream = client.mongo_db_name.mongo_db_collection.watch([{"$match" : { "operationType" : "update" }}], **option)
 for change in change_stream:
     print(dumps(change))
     print('')

您需要更换

  • YOUR-MONGO-CONNECT-STRING:包含您的 MongoDB 连接 URL
  • mongo_user:使用您的用户名
  • mongo_pass:使用您的密码
  • mongo_db_name:您想要观看的数据库名称
  • mongo_db_collection:带有您想要观看的集合名称

此外,您可以添加简历令牌以处理您将要处理的情况,例如出现网络错误。使用恢复令牌,流将再次连接,并且您将从丢失连接的那一刻收到通知。

import pymongo
from pymongo import errors

client = pymongo.MongoClient("mongodb+srv://YOUR-MONGO-CONNECT-STRING" % (
            mongo_user, mongo_pass, mongo_db_name))

try:
    resume_token = None
    pipeline = [{'$match': {'operationType': 'update'}}]
    with client.mongo_db_name.mongo_db_collection.watch(pipeline) as stream:
        for update_change in stream:
            print(update_change)
            resume_token = stream.resume_token
except pymongo.errors.PyMongoError:
    if resume_token is None:
       logging.error('...')
    else:
        with client.mongo_db_name.mongo_db_collection.watch(pipeline, resume_after=resume_token) as stream:
            for update_change in stream:
                print(update_change)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

观看 MongoDB 在 Python 中异步更改流 的相关文章

  • Django 代理模型的继承和多态性

    我正在开发一个我没有启动的 Django 项目 我面临着一个问题遗产 我有一个大模型 在示例中简化 称为MyModel这应该代表不同种类的物品 的所有实例对象MyModel应该具有相同的字段 但方法的行为根据项目类型的不同而有很大差异 到目
  • 通过 Scrapy 抓取 Google Analytics

    我一直在尝试使用 Scrapy 从 Google Analytics 获取一些数据 尽管我是一个完全的 Python 新手 但我已经取得了一些进展 我现在可以通过 Scrapy 登录 Google Analytics 但我需要发出 AJAX
  • 使 django 服务器可以在 LAN 中访问

    我已经安装了Django服务器 可以如下访问 http localhost 8000 get sms http 127 0 0 1 8000 get sms 假设我的IP是x x x x 当我这样做时 从同一网络下的另一台电脑 my ip
  • 如何在flask中使用g.user全局

    据我了解 Flask 中的 g 变量 它应该为我提供一个全局位置来存储数据 例如登录后保存当前用户 它是否正确 我希望我的导航在登录后在整个网站上显示我的用户名 我的观点包含 from Flask import g among other
  • Python(Selenium):如何通过登录重定向/组织登录登录网站

    我不是专业程序员 所以请原谅任何愚蠢的错误 我正在做一些研究 我正在尝试使用 Selenium 登录数据库来搜索大约 1000 个术语 我有两个问题 1 重定向到组织登录页面后如何使用 Selenium 登录 2 如何检索数据库 在我解决
  • 如何使用Conda下载python包并随后离线安装?

    我知道通过 pip 我可以使用以下命令下载 Python 包 但 pip install 破坏了我的内部包依赖关系 当我做 pip download
  • 如何在 Python 中检索 for 循环中的剩余项目?

    我有一个简单的 for 循环迭代项目列表 在某些时候 我知道它会破裂 我该如何退回剩余的物品 for i in a b c d e f g try some func i except return remaining items if s
  • PyUSB 1.0:NotImplementedError:此平台不支持或未实现操作

    我刚刚开始使用 pyusb 基本上我正在玩示例代码here https github com walac pyusb blob master docs tutorial rst 我使用的是 Windows 7 64 位 并从以下地址下载 z
  • 是否可以忽略一行的pyright检查?

    我需要忽略一行的pyright 检查 有什么特别的评论吗 def create slog group SLogGroup data Optional dict None SLog insert one SLog group group da
  • 从 Flask 访问 Heroku 变量

    我已经使用以下命令在 Heroku 配置中设置了数据库变量 heroku config add server xxx xxx xxx xxx heroku config add user userName heroku config add
  • BeautifulSoup 中的嵌套标签 - Python

    我在网站和 stackoverflow 上查看了许多示例 但找不到解决我的问题的通用解决方案 我正在处理一个非常混乱的网站 我想抓取一些数据 标记看起来像这样 table tbody tr tr tr td td td table tr t
  • 如何在ipywidget按钮中显示全文?

    我正在创建一个ipywidget带有一些文本的按钮 但按钮中未显示全文 我使用的代码如下 import ipywidgets as widgets from IPython display import display button wid
  • Python 的“zip”内置函数的 Ruby 等价物是什么?

    Ruby 是否有与 Python 内置函数等效的东西zip功能 如果不是 做同样事情的简洁方法是什么 一些背景信息 当我试图找到一种干净的方法来进行涉及两个数组的检查时 出现了这个问题 如果我有zip 我可以写这样的东西 zip a b a
  • Jupyter Notebook 内核一直很忙

    我已经安装了 anaconda 并且 python 在 Spyder IPython 等中工作正常 但是我无法运行 python 笔记本 内核被创建 它也连接 但它始终显示黑圈忙碌符号 防火墙或防病毒软件没有问题 我尝试过禁用两者 我也无法
  • 如何在 Python 中追加到 JSON 文件?

    我有一个 JSON 文件 其中包含 67790 1 kwh 319 4 现在我创建一个字典a dict我需要将其附加到 JSON 文件中 我尝试了这段代码 with open DATA FILENAME a as f json obj js
  • 如何计算 pandas 数据帧上的连续有序值

    我试图从给定的数据帧中获取连续 0 值的最大计数 其中包含来自 pandas 数据帧的 id date value 列 如下所示 id date value 354 2019 03 01 0 354 2019 03 02 0 354 201
  • 使用其构造函数初始化 OrderedDict 以便保留初始数据的顺序的正确方法?

    初始化有序字典 OD 以使其保留初始数据的顺序的正确方法是什么 from collections import OrderedDict Obviously wrong because regular dict loses order d O
  • Scrapy:如何使用元在方法之间传递项目

    我是 scrapy 和 python 的新手 我试图将 parse quotes 中的项目 item author 传递给下一个解析方法 parse bio 我尝试了 request meta 和 response meta 方法 如 sc
  • 管理员未授权 Mongodb 执行 listDatabases 命令

    删除 mongodb 用户后 无法重新连接 mongo 没有 验证 我创造了超级用户 https docs mongodb org v2 6 reference built in roles superuser roles tomuser
  • 导入错误:没有名为 site 的模块 - mac

    我已经有这个问题几个月了 每次我想获取一个新的 python 包并使用它时 我都会在终端中收到此错误 ImportError No module named site 我不知道为什么会出现这个错误 实际上 我无法使用任何新软件包 因为每次我

随机推荐

  • 在 smartGWT 中打开/保存文件

    我已经实现了 RPCService RPCServiceAsync 和 RPCServieImpl 单击按钮后 将调用服务器端的服务 它将从数据库中获取数据并创建文件 创建文件后 我需要在客户端打开该文件 并需要提示一个包含打开 保存选项的
  • ServiceStack - 依赖关系似乎没有被注入?

    我有以下存储库类 public class Repository
  • 微服务架构中的数据库位置

    我们有一个整体应用程序 现在正在使用容器将其转换为微服务架构 我们的微服务是stateful 即他们需要从数据库插入 检索数据 根据微服务架构 每个微服务应该有自己的数据 即我们案例中的数据库 我的问题是where应该部署每个微服务的数据库
  • 实体框架数据库优先与 Oracle 数据库

    我正在开发一个 ASP NET WebForms 应用程序 其中实体框架数据库优先与 SQL Server 连接 并且我想将相同的实体数据模型与 Oracle 数据库连接 我的担忧是 如何在我的开发机器上安装 Oracle 数据库引擎进行测
  • 使用用户定义的运行时属性的 UIView Shadow

    当我使用 用户定义的运行时属性 时 我很难显示阴影 如果我使用代码 它似乎工作完全正常 如下所示 func formatView view UIView cornerRadius Bool if cornerRadius view laye
  • Python Selenium Javascript链接点击无法执行

    我将 Selenium for Python 与 PhantomJS Ghost 驱动程序一起使用 以便单击 href 中包含 Javascript 的链接 例如来自this https structuredginniemaes ginni
  • MongoDB 根据 _id 统计每分钟新文档数

    我想创建每分钟存储多少新文档的统计数据 由于具有标准 ObjectID 的 id 字段已经包含文档创建的时间戳 我认为应该可以以某种方式使用它 在 Stackoverflow 上 我发现了以下映射归约代码 可以在有用于创建数据的专用字段时完
  • 从具有特定窗口坐标的命令行启动 Google Chrome

    我正在尝试找到一个 shell 命令 该命令将使用特定的 x 和 y 坐标打开 Google Chrome 以便我可以在窗口打开时设置窗口的位置 是否可以使用命令行参数来执行此操作 我需要修改以下命令才能实现此目的 google chrom
  • python中不均匀的子图

    在 python 中创建 3 3 子图矩阵的最佳方法是什么 第一列包含 3 个子图 第二列包含 3 个子图 第三列包含2 个次要情节 最后两个子图应具有相同的大小 这意味着它们将在其他两列的中间图的中间相遇 我尝试使用 gridspec 来
  • 当应用程序未处于焦点或位于另一个选项卡中时,WaitForSeconds 停止工作

    我用 Unity 制作了 WebGL 游戏 不需要每个帧都进行一些计算 因此我将它们放在协程中 但当游戏在后台运行时 协程的工作速度会比平时慢 并且会额外等待 5 10 秒 例如 士兵跑向我 我向他们发射子弹以杀死他们 当他们进入射程时 他
  • 引导程序上的下拉子菜单不起作用

    我只是想问为什么引导程序上的下拉子菜单不起作用 我只是按照此链接中的说明操作 http getbootstrap com 2 3 2 components html http getbootstrap com 2 3 2 component
  • 从 shell 脚本将密码输入 openssl 命令

    我正在尝试将 p12 从 shell 脚本转换为 pem 无需任何用户输入 我可以将密码作为脚本中的变量 所以当我打电话时 openssl pkcs12 in p12 out cert pem nodes 终端打印 输入导入密码 并等待输入
  • Jersey 2.x 自定义注入注释与属性

    我正在从 DropWizard 0 7 1 迁移到 0 8 1 这包括从 Jersey 1 x 迁移到 2 x 在我的 使用 Jersey 1 18 1 的实现 我有一个MyProvider 为了简单起见 更改了所有类名 实现Injecta
  • 如何避免SRP混乱?

    通过应用 SRP 原则 您必然会有很多课程 如果这对于小型项目来说效果很好 那么您如何处理和组织大型项目中的类数量 你如何组织文件夹结构 你怎么记得你建造了什么 你怎么知道其他人是否没有在其他类中构建相同的功能 这适用于所有类型的图书馆 不
  • 在 VBA 中滚动网页时等待窗口重新加载

    我编写了一个 VBA 宏来计算 Google 搜索特定术语时返回的 大约 图像数量 我的意思是 程序应该计算返回的图像数量 向下滚动以加载更多图像 如果适用 最多可计算 400 个图像 这是 简化的 代码 Sub GoogleCount C
  • 如何在 C# 中运行同步计时器?

    我正在编写一个应用程序 它使用计时器在屏幕上显示某些事件发生时的倒计时 我想重用计时器 因为它对于应用程序中的一些操作会很方便 因此我指定了要环绕计时器的单词 例如 以下函数调用 CountdownTimer 90 You have unt
  • 为 git 子模块指定分支?

    我已经将 git 子模块添加到我的 git 存储库中并且工作正常 在我的 父 存储库中 我创建了一个功能分支 我的特征这需要对子模块进行一些更改 但我不想影响使用相同子模块的其他团队 因此我在子模块存储库上创建了相应的功能分支子模块功能有一
  • 如何在路由器上注册单个视图(不是视图集)?

    我正在使用 Django REST 框架 并一直在尝试创建一个返回少量信息的视图 并将其注册到我的路由器上 我有四个存储信息的模型 它们都有一个created time场地 我正在尝试创建一个返回最新对象的视图 基于created time
  • 使用 JSoup 作为服务显示超链接的 ListView

    我最近需要收集并显示超链接列表 这很有帮助example https stackoverflow com a 73160763 230513说明使用jsoup questions tagged jsoup and a Task
  • 观看 MongoDB 在 Python 中异步更改流

    我如何设置一个 python 服务来 异步 监视 mongodb 的更改流 我能找到的一切mongodb com https www mongodb com developer quickstart python change stream