Python Asyncio run_forever() 和任务

2024-05-02

我改编了这段代码,以便在异步 Python 中使用 Google Cloud PubSub:https://github.com/cloudfind/google-pubsub-asyncio https://github.com/cloudfind/google-pubsub-asyncio

import asyncio
import datetime
import functools
import os

from google.cloud import pubsub
from google.gax.errors import RetryError
from grpc import StatusCode

async def message_producer():
    """ Publish messages which consist of the current datetime """
    while True:
        await asyncio.sleep(0.1)


async def proc_message(message):
    await asyncio.sleep(0.1)
    print(message)
    message.ack()


def main():
    """ Main program """
    loop = asyncio.get_event_loop()

    topic = "projects/{project_id}/topics/{topic}".format(
        project_id=PROJECT, topic=TOPIC)
    subscription_name = "projects/{project_id}/subscriptions/{subscription}".format(
        project_id=PROJECT, subscription=SUBSCRIPTION)

    subscription = make_subscription(
        topic, subscription_name)

    def create_proc_message_task(message):
        """ Callback handler for the subscription; schedule a task on the event loop """
        print("Task created!")
        task = loop.create_task(proc_message(message))

    subscription.open(create_proc_message_task)
    # Produce some messages to consume

    loop.create_task(message_producer())

    print("Subscribed, let's do this!")
    loop.run_forever()


def make_subscription(topic, subscription_name):
    """ Make a publisher and subscriber client, and create the necessary resources """
    subscriber = pubsub.SubscriberClient()
    try:
        subscriber.create_subscription(subscription_name, topic)
    except:
        pass
    subscription = subscriber.subscribe(subscription_name)

    return subscription


if __name__ == "__main__":
    main()

我基本上删除了发布代码,只使用订阅代码。 然而,最初我并没有包括loop.create_task(message_producer())线。我认为任务是按预期创建的,但它们从未真正运行过。仅当我添加上述行时,代码才能正确执行并且所有创建的任务都会运行。是什么导致了这种行为?


PubSub 正在调用create_proc_message_task来自不同线程的回调。自从create_task is 不是线程安全的 https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading,它只能从运行事件循环的线程(通常是主线程)调用。要纠正该问题,请更换loop.create_task(proc_message(message)) with asyncio.run_coroutine_threadsafe(proc_message(message), loop) and message_producer将不再需要。

至于为什么message_producer似乎修复了代码,请考虑run_coroutine_threadsafe https://docs.python.org/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafecreate_task:

  • 它以线程安全的方式运行,因此并发执行此操作时事件循环数据结构不会损坏。
  • 它确保事件循环尽快唤醒,以便它可以处理新任务。

在你的情况下create_task将任务添加到循环的可运行队列中(没有任何锁定),但无法确保唤醒,因为在事件循环线程中运行时不需要唤醒。这message_producerthen 用于强制循环定期唤醒,此时它还检查并执行可运行任务。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python Asyncio run_forever() 和任务 的相关文章

  • python:查找围绕某个 GPS 位置的圆的 GPS 坐标的优雅方法

    我有一组以十进制表示的 GPS 坐标 并且我正在寻找一种方法来查找每个位置周围半径可变的圆中的坐标 这是一个例子 http green and energy com downloads test circle html我需要什么 这是一个圆
  • 使用特定的类/函数预加载 Jupyter Notebook

    我想预加载一个笔记本 其中包含我在另一个文件中定义的特定类 函数 更具体地说 我想用 python 来做到这一点 比如加载一个配置文件 包含所有相关的类 函数 目前 我正在使用 python 生成笔记本并在服务器上自动启动它们 因为不同的
  • 如何用python脚本控制TP LINK路由器

    我想知道是否有一个工具可以让我连接到路由器并关闭它 然后从 python 脚本重新启动它 我知道如果我写 import os os system ssh l root 192 168 2 1 我可以通过 python 连接到我的路由器 但是
  • 处理 Python 行为测试框架中的异常

    我一直在考虑从鼻子转向行为测试 摩卡 柴等已经宠坏了我 到目前为止一切都很好 但除了以下之外 我似乎无法找出任何测试异常的方法 then It throws a KeyError exception def step impl contex
  • 跟踪 pypi 依赖项 - 谁在使用我的包

    无论如何 是否可以通过 pip 或 PyPi 来识别哪些项目 在 Pypi 上发布 可能正在使用我的包 也在 PyPi 上发布 我想确定每个包的用户群以及可能尝试积极与他们互动 预先感谢您的任何答案 即使我想做的事情是不可能的 这实际上是不
  • 删除flask中的一对一关系

    我目前正在使用 Flask 开发一个应用程序 并且在删除一对一关系中的项目时遇到了一个大问题 我的模型中有以下结构 class User db Model tablename user user id db Column db String
  • 使用Python请求登录Google帐户

    在多个登录页面上 需要谷歌登录才能继续 我想用requestspython 中的库以便让我自己登录 通常这很容易使用requests库 但是我无法让它工作 我不确定这是否是由于 Google 做出的一些限制 也许我需要使用他们的 API 或
  • YOLOv8获取预测边界框

    我想将 OpenCV 与 YOLOv8 集成ultralytics 所以我想从模型预测中获取边界框坐标 我该怎么做呢 from ultralytics import YOLO import cv2 model YOLO yolov8n pt
  • datetime.datetime.now() 返回旧值

    我正在通过匹配日期查找 python 中的数据存储条目 我想要的是每天选择 今天 的条目 但由于某种原因 当我将代码上传到 gae 服务器时 它只能工作一天 第二天它仍然返回相同的值 例如当我上传代码并在 07 01 2014 执行它时 它
  • 使用 xlrd 打开 BytesIO (xlsx)

    我正在使用 Django 需要读取上传的 xlsx 文件的工作表和单元格 使用 xlrd 应该可以 但因为文件必须保留在内存中并且可能不会保存到我不知道如何继续的位置 本例中的起点是一个带有上传输入和提交按钮的网页 提交后 文件被捕获req
  • 为什么 PyYAML 花费这么多时间来解析 YAML 文件?

    我正在解析一个大约 6500 行的 YAML 文件 格式如下 foo1 bar1 blah name john age 123 metadata whatever1 whatever whatever2 whatever stuff thi
  • Python 2:SMTPServerDisconnected:连接意外关闭

    我在用 Python 发送电子邮件时遇到一个小问题 me my email address you recipient s email address me email protected cdn cgi l email protectio
  • 从Python中的字典列表中查找特定值

    我的字典列表中有以下数据 data I versicolor 0 Sepal Length 7 9 I setosa 0 I virginica 1 I versicolor 0 I setosa 1 I virginica 0 Sepal
  • 在 Sphinx 文档中*仅*显示文档字符串?

    Sphinx有一个功能叫做automethod从方法的文档字符串中提取文档并将其嵌入到文档中 但它不仅嵌入了文档字符串 还嵌入了方法签名 名称 参数 我如何嵌入only文档字符串 不包括方法签名 ref http www sphinx do
  • pyspark 将 twitter json 流式传输到 DF

    我正在从事集成工作spark streaming with twitter using pythonAPI 我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理 但根据我的用例 我需要所有字段twitter J
  • import matplotlib.pyplot 给出 AttributeError: 'NoneType' 对象没有属性 'is_interactive'

    我尝试在 Pycharm 控制台中导入 matplotlib pyplt import matplotlib pyplot as plt 然后作为回报我得到 Traceback most recent call last File D Pr
  • python import inside函数隐藏现有变量

    我在我正在处理的多子模块项目中遇到了一个奇怪的 UnboundLocalError 分配之前引用的局部变量 问题 并将其精简为这个片段 使用标准库中的日志记录模块 import logging def foo logging info fo
  • 如何计算Python中字典中最常见的前10个值

    我对 python 和一般编程都很陌生 所以请友善 我正在尝试分析包含音乐信息的 csv 文件并返回最常听的前 n 个乐队 从下面的代码中 每听一首歌曲都是一个列表中的字典条目 格式如下 album Exile on Main Street
  • cv2.VideoWriter:请求一个元组作为 Size 参数,然后拒绝它

    我正在使用 OpenCV 4 0 和 Python 3 7 创建延时视频 构造 VideoWriter 对象时 文档表示 Size 参数应该是一个元组 当我给它一个元组时 它拒绝它 当我尝试用其他东西替换它时 它不会接受它 因为它说参数不是
  • 使用 z = f(x, y) 形式的 B 样条方法来拟合 z = f(x)

    作为一个潜在的解决方案这个问题 https stackoverflow com questions 76476327 how to avoid creating many binary switching variables in gekk

随机推荐

  • Foreach 循环编辑器中缺少枚举器

    我在一台新笔记本电脑上 在 Visual Studio 2015 中的 SQL Server Integration Services 包上工作 在以前版本的 Visual Studio 中 枚举器配置部分中有几个选项 如下所示 在 VS
  • 邀请朋友加入 Facebook 应用程序 - Android,未发送邀请

    我已经用我的 Android 编码创建了一个 Facebook 活动 现在我想邀请朋友参加 下面是我邀请好友的代码 private void inviteFriends try Bundle params new Bundle params
  • switch 在 Visual C++ 中如何编译?它的优化程度和速度如何?

    我发现我只能在 C 中使用数值switch陈述 我认为它和一堆更深层的区别if else s 因此我问自己 如何switch与 不同if elseif elseif在运行速度 编译时优化和一般编译方面 我这里主要说的是MSVC 开关通常被编
  • Android 应用程序方向更改会重新启动 Activity

    我有一个活动 只要方向改变 它就会重新启动 我编写了代码来防止清单文件中的方向发生变化时活动重新启动 如下所示
  • setTimeout 在 Greasemonkey 中并不总是有效

    我发现了很多类似的问题 但没有一个是平等的 也没有正确的解决方案 这是一个很奇怪的问题 我有一个简单的 Greasemonkey 脚本来测试这个问题 UserScript name testdiddio namespace http use
  • 在同一个图表上绘制两个直方图,并将它们的列总和为 100

    我有两组不同大小的数据 我想将它们绘制在同一个直方图上 然而 由于一组有约 330 000 个值 另一组有约 16 000 个值 因此它们的频率直方图很难比较 我想绘制一个比较两组的直方图 使得 y 轴是该箱中出现的百分比 我下面的代码与此
  • 为什么没有人接受 C# 中的公共字段?

    似乎每个 C 静态分析器在看到公共字段时都会抱怨 但为什么 当然 在某些情况下 公共 或内部 field就足够了 拥有一个拥有它的财产是没有意义的get and set 方法 如果我确定我不会重新定义该字段或添加该字段怎么办 副作用很糟糕
  • Android GCM主题订阅限制

    随着 android gcm 中主题的引入 我正在评估此选项 以简化保持服务器与某些订阅同步所需要做的工作 不过我在文档中读到主题的使用仅限于 100 万个订阅 这是否意味着您不能拥有超过 100 万个用户 具有一个或多个主题 或者您只能订
  • 滚动视图内的 TabHost:单击选项卡时始终向下滚动

    我有一个 Activity 其中 Scrollview 作为顶级元素 里面还有一些其他视图 并且在某些时候还有一个 TabHost 通过查看您可能会有更好的印象截图 http img263 imageshack us img263 5329
  • 未知的 Heroku 错误

    我尝试在 heroku 上运行应用程序时遇到以下错误 2011 06 03T11 24 25 07 00 heroku nginx GET HTTP 1 1 2011 06 03T18 24 37 00 00 heroku router E
  • 在iphone应用程序中的一个线程之后调度一个线程

    我想在线程完成后安排线程 是否可以 如何 例如 指定我的需要 void connection NSURLConnection connection didReceiveData NSData data 1 response schedule
  • 无法启动客户端 Rust 语言服务器

    我正在尝试弄清楚如何使用 WSL 中的 rustc 和 Cargo 我使用 VS Code 和 Rust rls 插件 可以编译我的代码 但 RLS 存在问题 无法启动客户端 Rust 语言服务器 Rustup 不可用 安装自https w
  • 为什么 Github 操作日志显示星号?

    在 Github 操作运行器上运行时 Maven 日志输出仅显示 3 个星号 而不是实际的字符串 警告 home runner work project src test java de persistence dao DaoTest ja
  • C 中的字符计数

    我正在尝试编写一个程序来计算字符串中的所有字符 我本来有它 但后来意识到我无法计算空格 我不明白为什么这不起作用 for m 0 z m 0 m if z m charcount 任何帮助表示赞赏 编辑 如果像这样扫描输入 字符串 会有什么
  • Jenkins email-ext 会针对未按预期工作的罪魁祸首触发电子邮件

    当构建失败时 我试图向罪魁祸首发送电子邮件 如果我手动启动构建 则会发送电子邮件 但如果我通过 SCM 轮询启动构建 则不会发送电子邮件 并且构建的控制台输出会显示以下消息 An attempt to send an e mail to e
  • 如何使用 NPOI 获取包含日期的单元格的值并保留原始格式

    我有一个使用 DevExpress 编辑的 Excel 文件 并且正在使用 NPOI 阅读 当我尝试以字符串形式获取日期单元格的值时 它不会保留原始值 例如 在 DevExpress 网格中我设置了这个值 2016 08 12 我想在字符串
  • 使用 Seaborn FacetGrid 绘制相关热图

    我正在尝试创建一个带有热图的图像 分别表示每个标签的数据点特征的相关性 使用seaborn 我可以为单个类创建热图 如下所示 grouped df groupby target sns heatmap grouped get group C
  • 使用 MVC 通配符证书在 Azure 上托管许多 SSL 站点

    以下应用程序当前在我尝试迁移到 Azure 的 Windows 2008 R2 服务器上运行 Part 1 首先 我有以下 ASP NET MVC 站点 它根据 DNS 名称的第一部分路由客户 https customer1 myAzure
  • 将数字的最后 n 位转换为零

    在Python中 将数字的最后一位数字替换为零并保持前三位数字不变的最佳方法是什么 例子 23456789 gt 23400000 112022 gt 112000 1111 gt 1110 111 gt 111 no conversion
  • Python Asyncio run_forever() 和任务

    我改编了这段代码 以便在异步 Python 中使用 Google Cloud PubSub https github com cloudfind google pubsub asyncio https github com cloudfin