如何在不阻塞主线程的情况下“触发并忘记”任务?

2023-12-08

我想到的是一个非常通用的BackgroundTask 类,可以在网络服务器或独立脚本中使用,以安排不需要阻塞的任务。

我不想在这里使用任何任务队列(celery、rabbitmq 等),因为我正在考虑的任务太小且运行速度太快。只是想尽可能地完成它们。这是一种异步方法吗?将它们扔到另一个进程上?

我想出的第一个可行的解决方案:

# Need ParamSpec to get correct type hints in BackgroundTask init
P = ParamSpec("P")


class BackgroundTask(metaclass=ThreadSafeSingleton):
    """Easy way to create a background task that is not dependent on any webserver internals.

    Usage:
        async def sleep(t):
            time.sleep(t)

        BackgroundTask(sleep, 10) <- Creates async task and executes it separately (nonblocking, works with coroutines)
        BackgroundTask(time.sleep, 9) <- Creates async task and executes it separately (nonblocking, works with normal functions)
    """

    background_tasks = set()
    lock = threading.Lock()

    def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
        """Uses singleton instance of BackgroundTask to add a task to the async execution queue.

        Args:
            func (typing.Callable[P, typing.Any]): _description_
        """
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            with self.lock:
                task = asyncio.create_task(self.func(*self.args, **self.kwargs))
                self.background_tasks.add(task)
                print(len(self.background_tasks))
                task.add_done_callback(self.background_tasks.discard)

        # TODO: Create sync task (this will follow a similar pattern)


async def create_background_task(func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
    b = BackgroundTask(func, *args, **kwargs)
    await b()


# Usage:
async def sleep(t):
    time.sleep(t)

await create_background_task(sleep, 5)

我认为这样做我错过了重点。如果我将此代码与其他一些异步代码一起运行,那么是的,我会获得性能优势,因为阻塞操作不再阻塞主线程。

我想我可能需要更像一个单独的进程来处理此类后台任务,而根本不会阻塞主线程(上面的异步代码仍将在主线程上运行)。

有一个单独的线程来处理后台作业是否有意义?就像一个简单的作业队列但非常轻量级并且不需要额外的基础设施?

或者创建像上面这样的解决方案有意义吗?

我见过 Starlette 做了这样的事情(https://github.com/encode/starlette/blob/decc5279335f105837987505e3e477463a996f3e/starlette/background.py#L15)但它们在返回响应后等待后台任务。

这使得他们的解决方案依赖于 Web 服务器设计(即发送响应后执行操作就可以了)。我想知道我们是否可以构建一些更通用的东西,您可以在脚本或网络服务器中运行后台任务,而不牺牲性能。

对异步/并发功能不太熟悉,所以不知道如何比较这些解决方案。似乎是一个有趣的问题!

这是我尝试在另一个进程上执行任务时想到的:


class BackgroundTask(metaclass=ThreadSafeSingleton):
    """Easy way to create a background task that is not dependent on any webserver internals.

    Usage:
        async def sleep(t):
            time.sleep(t)

        BackgroundTask(sleep, 10) <- Creates async task and executes it separately (nonblocking, works with coroutines)
        BackgroundTask(time.sleep, 9) <- Creates async task and executes it separately (nonblocking, works with normal functions)
        BackgroundTask(es.transport.close) <- Probably most common use in our codebase
    """

    background_tasks = set()
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
    lock = threading.Lock()

    def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
        """Uses singleton instance of BackgroundTask to add a task to the async execution queue.

        Args:
            func (typing.Callable[P, typing.Any]): _description_
        """
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            with self.lock:
                loop = asyncio.get_running_loop()
                with self.executor as pool:
                    result = await loop.run_in_executor(
                        pool, functools.partial(self.func, *self.args, **self.kwargs))


你的问题太抽象了,我会尽力给出所有问题的共同答案。

如何在不阻塞主线程的情况下“触发并忘记”任务?

这取决于你所说的“忘记”是什么意思。

  • 如果您不打算在运行后访问该任务,则可以在并行进程中运行它。
  • 如果主应用程序应该能够访问后台任务,那么您应该拥有事件驱动的架构。在这种情况下,以前称为任务的东西将是服务或微服务。

我不想在这里使用任何任务队列(celery、rabbitmq 等),因为我正在考虑的任务太小且运行速度太快。只是想尽可能地完成它们。这是一种异步方法吗?将它们扔到另一个进程上?

如果它包含循环或其他 CPU 密集型操作,则有权使用子进程。如果任务发出请求(异步),读取文件,记录到stdout,或其他 I/O 绑定操作,那么使用协程或线程是正确的。

有一个单独的线程来处理后台作业是否有意义?就像一个简单的作业队列但非常轻量级并且不需要额外的基础设施?

我们不能只使用线程,因为它可能会被另一个使用 CPU 密集型操作的任务阻塞。相反,我们可以运行后台进程并使用管道、队列和事件在进程之间进行通信。不幸的是,我们无法在进程之间提供复杂的对象,但我们可以提供基本的数据结构来处理后台运行的任务的状态变化。

关于斯塔莱特后台任务

Starlette 是一个轻量级的 ASGI 框架/工具包,非常适合用 Python 构建异步 Web 服务。 (自述文件说明)

它是基于并发的。因此,即使这也不是适用于所有类型任务的通用解决方案。注意:并发不同于并行。

我想知道我们是否可以构建一些更通用的东西,您可以在脚本或网络服务器中运行后台任务,而不牺牲性能。

上述解决方案建议使用后台进程。尽管如此,这仍然取决于应用程序设计,因为您必须执行运行进程(任务)的通信和同步所需的操作(发出事件、向队列添加指示器等)。没有通用的工具可以实现这一点,但有一些取决于具体情况的解决方案。

情况 1 - 任务是异步函数

假设我们有一个request函数应该调用 API,而不阻塞其他任务的工作。另外,我们还有一个sleep不应该阻止任何东西的函数。

import asyncio
import aiohttp


async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            try:
                return await response.json()
            except aiohttp.ContentTypeError:
                return await response.read()


async def sleep(t):
    await asyncio.sleep(t)


async def main():
    background_task_1 = asyncio.create_task(request("https://google.com/"))
    background_task_2 = asyncio.create_task(sleep(5))

    ...  # here we can do even CPU-bound operations

    result1 = await background_task_1

    ...  # use the 'result1', etc.

    await background_task_2


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

在这种情况下,我们使用asyncio.create_task同时运行协程(就像在后台一样)。当然,我们可以在子进程中运行它,但没有理由这样做,因为它会使用更多资源而不提高性能。

情况 2 - 任务是同步函数(I/O 绑定)

与第一种情况(函数已经是异步的)不同,在这种情况下,这些函数是同步的,但不受 CPU 限制(I/O 限制)。这使得能够在线程中运行它们或使它们异步(使用asyncio.to_thread)并同时运行。

import time
import asyncio
import requests


def asynchronous(func):
    """
    This decorator converts a synchronous function to an asynchronous
    
    Usage:
        @asynchronous
        def sleep(t):
            time.sleep(t)
            
        async def main():
            await sleep(5)
    """
    
    async def wrapper(*args, **kwargs):
        await asyncio.to_thread(func, *args, **kwargs)

    return wrapper


@asynchronous
def request(url):
    with requests.Session() as session:
        response = session.get(url)
        try:
            return response.json()
        except requests.JSONDecodeError:
            return response.text


@asynchronous
def sleep(t):
    time.sleep(t)

    
async def main():
    background_task_1 = asyncio.create_task(request("https://google.com/"))
    background_task_2 = asyncio.create_task(sleep(5))
    ...

在这里,我们使用装饰器将同步(I/O 绑定)函数转换为异步函数,并像第一种情况一样使用它们。

情况 3 - 任务是同步函数(受 CPU 限制)

要在后台并行运行 CPU 密集型任务,我们必须使用多处理。为了确保任务完成,我们使用join method.

import time
import multiprocessing


def task():
    for i in range(10):
        time.sleep(0.3)


def main():
    background_task = multiprocessing.Process(target=task)
    background_task.start()

    ...  # do the rest stuff that does not depend on the background task

    background_task.join()  # wait until the background task is done

    ...  # do stuff that depends on the background task


if __name__ == "__main__":
    main()

假设主应用程序依赖于后台任务的部分。在这种情况下,我们需要一个event驱动设计作为join不能多次调用。

import multiprocessing

event = multiprocessing.Event()


def task():
    ...  # synchronous operations

    event.set()  # notify the main function that the first part of the task is done

    ...  # synchronous operations

    event.set()  # notify the main function that the second part of the task is also done

    ...  # synchronous operations


def main():
    background_task = multiprocessing.Process(target=task)
    background_task.start()

    ...  # do the rest stuff that does not depend on the background task

    event.wait()  # wait until the first part of the background task is done

    ...  # do stuff that depends on the first part of the background task

    event.wait()  # wait until the second part of the background task is done

    ...  # do stuff that depends on the second part of the background task

    background_task.join()  # wait until the background task is finally done

    ...  # do stuff that depends on the whole background task


if __name__ == "__main__":
    main()

正如您已经注意到的那样,对于事件,我们只能提供二进制信息,如果进程超过两个,那么这些信息就无效(不可能知道事件是从哪里发出的)。所以我们使用pipes, queues, and manager在进程之间提供非二进制信息。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在不阻塞主线程的情况下“触发并忘记”任务? 的相关文章

  • 从 torch.autograd.gradcheck 导入 zero_gradients

    我想复制代码here https github com LTS4 DeepFool blob master Python deepfool py 并且我在 Google Colab 中运行时收到以下错误 ImportError 无法导入名称
  • App Engine 上的 Django 与 webapp2 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 使用 Boto3 超时的 AWS Lambda 函数

    我已经解决了我自己的问题 但无论如何我都会发布它 希望能节省其他人几个小时 我在 AWS 上有一个无服务器项目 使用 Python 将记录插入到 kinesis 队列中 但是 当我使用 boto3 client kinesis 或 put
  • 使用 Python 和 lmfit 拟合复杂模型?

    我想适合椭偏仪 http en wikipedia org wiki Ellipsometry使用 LMFit 将数据转换为复杂模型 两个测量参数 psi and delta 是复杂函数中的变量rho 我可以尝试将问题分离为实部和虚部共享参
  • Scrapy 文件管道不下载文件

    我的任务是构建一个可以下载所有内容的网络爬虫 pdfs 在给定站点中 Spider 在本地计算机和抓取集线器上运行 由于某种原因 当我运行它时 它只下载一些但不是全部的 pdf 通过查看输出中的项目可以看出这一点JSON 我已经设定MEDI
  • 将整数系列转换为交替(双元)二进制系列

    我不知道如何最好地表达这个问题 因为在这里谷歌搜索和搜索总是让我找到更复杂的东西 我很确定这是基本的东西 但对于我的生活来说 我找不到一个好的方法来做到这一点下列 给定一个整数序列 比如说 for x in range 0 36 我想将这些
  • 使用 Pandas 从 csv 文件读取标题信息

    我有一个包含 14 行标题的数据文件 在标头中 有经纬度坐标和时间的元数据 我目前正在使用 pandas read csv filename delimiter header 14 读取文件 但这只是获取数据 我似乎无法获取元数据 有人知道
  • 如何用函数记录一个文件?

    我有一个带有函数 lib py 但没有类的python 文件 每个函数都有以下样式 def fnc1 a b c This fonction does something param a lalala type a str param b
  • NSUserNotificationCenter.defaultUserNotificationCenter() 使用 PyInstaller 返回 None

    我正在尝试将通知发送到通知中心 Mac OSX 我正在使用 PyObjC 绑定来使用我们的 python 应用程序中的 cocoa api 我正在使用以下代码片段 import Foundation import objc NSUserNo
  • 我可以用关闭的文件对象做什么?

    当您打开文件时 它存储在一个打开的文件对象中 该对象使您可以访问该文件的各种方法 例如读取或写入 gt gt gt f open file0 gt gt gt f
  • 在 Haskell 中创建 100 万个线程需要多长时间?

    据我了解 Haskell 有绿色线程 但它们的重量有多轻 是否可以创建100万个线程 或者 100 000 个线程需要多长时间 from here http www reddit com r programming comments a4n
  • 如何从 python 脚本执行 7zip 命令

    我试图了解如何使用 os system 模块来执行 7zip 命令 现在我不想用 Popen 或 subprocess 让事情变得复杂 我已经安装了 7zip 并将 7zip exe 复制到我的用户文件夹中 我只想提取我的测试文件 inst
  • Pandas 字典键到列[重复]

    这个问题在这里已经有答案了 我有一个像这样的数据框 index column1 e1 u c680 5 u c681 1 u c682 2 u c57 e2 u c680 6 u c681 2 u c682 1 u c57 e3 u c68
  • Python在没有pandas的情况下解码excel表

    我正在尝试在 python 中读取 excel 文件而不使用pandas or xlrd 我一直在尝试将结果转换为bytes to utf 8没有任何成功 xls 文件中的数据 colA colB colC spc 1D0 20190705
  • Snakemake:将多个输入用于具有多个子组的一个输出的规则

    我有一个工作管道 用于下载 比对和对公共测序数据执行变体调用 问题是它目前只能在每个样本的基础上工作 i e作为每个单独测序实验的样本 如果我想对一组实验 例如样本的生物和 或技术复制 执行变体调用 则它不起作用 我试图解决它 但我无法让它
  • 线程安全的 C++ 堆栈

    我是 C 新手 正在编写一个多线程应用程序 不同的编写者将对象推入堆栈 读者将它们从堆栈中拉出 或至少将指针推入对象 C 中是否有任何内置结构可以在不添加锁定代码等的情况下处理此问题 如果没有 那么 Boost 库呢 EDIT 你好 感谢您
  • 获取多个同名请求参数

    我的问题是给定的代码 from flask import Flask request app Flask name app route def hello return str request values get param None a
  • 如何从邻接表高效创建稀疏邻接矩阵?

    我正在与last fm http labrosa ee columbia edu millionsong lastfm数据集来自百万歌曲数据集 http labrosa ee columbia edu millionsong 数据以一组 j
  • 类返回语句不打印任何输出

    我正在学习课程 但遇到了问题return语句 它是语句吗 我希望如此 程序什么也没有打印出来 它只是结束而不做任何事情 class className def createName self name self name name def
  • 为什么从 openAI 导入 Universe 模块时出现“无效语法”错误

    当我导入时universe来自 openAI 的模块 我收到以下错误 Traceback most recent call last File

随机推荐

  • 改变 CGRect (或任何结构)?

    我在我的代码中做了很多这样的事情 self sliderOne frame CGRectMake newX 0 self sliderOne frame size width self sliderOne frame size height
  • 卷积神经网络输出所有标签的相同概率

    我目前正在 MNIST 上训练 CNN 随着训练的进行 输出概率 softmax 给出 0 1 0 1 0 1 初始值不统一 所以我不知道我是否在这里做了一些愚蠢的事情 我只训练了15步 只是为了看看训练进展如何 尽管这个数字很低 但我认为
  • Swift:无法以编程方式编辑自定义表格单元格的属性

    我正在尝试更改自定义表格单元格内视图的颜色 并且我有一个可以使用的出口 我可以更改此视图的其他属性 例如 isHidden but backgroundColor似乎不起作用 知道我做错了什么吗 UIColor 名为 绿色 适用于应用程序的
  • 如何在 swift 中将双精度型转换为字节数组?

    我知道如何在java中做到这一点 参见here 但我找不到 java 的 ByteBuffer 的快速等效项 因此找不到它的 putDouble double value 方法 基本上 我正在寻找这样的函数 typealias Byte U
  • 有没有办法在每个页面上打印网页页眉/页脚?

    根据我的研究 似乎我想做的事情是不可能的 但以防万一发生了变化 我想检查一下是否有人想出了一种方法来做到这一点 我有一个网络应用程序 可以根据浏览器窗口中的用户选择生成打印报告 我有一个自定义页眉和页脚 当从浏览器打印报告时 应在每个打印页
  • 在 R 中使用 ggplot2 创建“雷达图”(又名星图;蜘蛛图)

    我想创建一个如下图所示的图 我知道我可以使用radarchart函数来自fmsb包裹 我怀疑是否ggplot2可以用极坐标这样做吗 谢谢 首先 我们加载一些包 library reshape2 library ggplot2 library
  • JS:如何在没有 XMLHttpRequest 的情况下对本地文件进行 Base64 编码?

    我正在尝试对本地文件进行 Base64 编码 它就在我的旁边 js文件 因此不会进行上传 解决方案如this using XMLHttpRequest 出现跨站点脚本错误 我正在尝试这样的事情 这不起作用 但它可能有助于解释我的问题 var
  • 获取 MySQL 数据库列表和服务器版本?

    我的 MySQL 连接字符串是 Server localhost User ID root Password 123 pooling yes charset utf8 DataBase 我的问题是 我应该编写什么查询来获取存在的数据库名称
  • Qt:重叠半透明QgraphicsItem

    我使用 QGraphicsView 一段时间了 我面临着一个先决条件 我不确定使用这个框架是否可以满足它 尽可能简单地说 我有 2 个重叠的 RectItem 和一个半透明的 QBrush 两者相同 是否可以防止重叠区域变得更加不透明 我只
  • Google Drive 使用 javascript 断点续传

    我正在尝试使用以下方式将文件上传到 Google 云端硬盘适用于 JavaScript 的 Google API 客户端库 and 可断点上传类型 我成功进行了身份验证并获取了上传 URI 但在发送实际数据时遇到了问题 如果文件仅包含 AS
  • 仅针对更改的文件增量构建 VSTS (Wordpress)

    我有一个用于 Wordpress php 文件的构建和发布管道 但我只想为源代码管理中更改的文件创建构建 我没有使用 Visual Studio 构建任务 因为我猜这不适用于 php 文件 以下是管道的配置方式 目前根据build我仅使用来
  • 如何从可能包含或不包含 null 的缓冲区构造“std::string”?

    我有一个缓冲区 为简单起见 假设是一个固定大小的数组 char我想建造一个std string从 所述缓冲区可以是空终止的或者其内容可以运行到并包括最后一个字符 如果缓冲区确实包含一个或多个空值 它们不应该出现在结果中string 复制应该
  • 如何在Android应用程序中的单个数据库中创建两个表?

    我已经在 Sqlite Android 应用程序中创建了一个数据库 并尝试在数据库中添加两个表 但创建该数据库时遇到问题 仅创建了第一个表 有谁能够帮助我 package com android cdtech import java sql
  • Java 中的迭代器

    什么是迭代器和集合 这两者有什么关系吗 the interface definition Interface Iterator boolean hasNext Object next note one way traffic void re
  • 使用 php 读取波斯语(Unicode 字符)文本文件

    我正在借助以下代码阅读一个波斯语文本文件 使用 PHP Reading the file name and the book UTF 8 if file exists SourceDirectoryFile NameBook name tx
  • 网格项内容的基线是如何确定的?

    这真的让我很困惑 请参阅下面的两个演示 div style display grid div span First row name span div div div div div div
  • 当依赖项/下游(如第 3 方 API)失败时要使用什么 HTTP 状态代码?

    我们的 API 中有一条路由 当调用时 会命中另一个第 3 方 API e g HTTP GET account 1 这会从我们的数据库和 比如说 第三方 api 例如 Auth0 Okta SalesForce whatever 返回一些
  • 结构体中的匿名联合不在 c99 中?

    这是我遇到的问题的非常简化的代码 enum node type t int t double struct int node int value struct double node double value struct node enu
  • 执行路径搜索?

    我想从我的代码执行一个程序 并为其提供环境变量和参数 亚洲信息通信技术协会 execve是正确的选择 But execve收到一个path论证 不是一个filename 这意味着它期望第一个参数是可执行文件的路径 我知道我可以解析 PATH
  • 如何在不阻塞主线程的情况下“触发并忘记”任务?

    我想到的是一个非常通用的BackgroundTask 类 可以在网络服务器或独立脚本中使用 以安排不需要阻塞的任务 我不想在这里使用任何任务队列 celery rabbitmq 等 因为我正在考虑的任务太小且运行速度太快 只是想尽可能地完成