你的问题太抽象了,我会尽力给出所有问题的共同答案。
如何在不阻塞主线程的情况下“触发并忘记”任务?
这取决于你所说的“忘记”是什么意思。
- 如果您不打算在运行后访问该任务,则可以在并行进程中运行它。
- 如果主应用程序应该能够访问后台任务,那么您应该拥有事件驱动的架构。在这种情况下,以前称为任务的东西将是服务或微服务。
我不想在这里使用任何任务队列(celery、rabbitmq 等),因为我正在考虑的任务太小且运行速度太快。只是想尽可能地完成它们。这是一种异步方法吗?将它们扔到另一个进程上?
如果它包含循环或其他 CPU 密集型操作,则有权使用子进程。如果任务发出请求(异步),读取文件,记录到stdout
,或其他 I/O 绑定操作,那么使用协程或线程是正确的。
有一个单独的线程来处理后台作业是否有意义?就像一个简单的作业队列但非常轻量级并且不需要额外的基础设施?
我们不能只使用线程,因为它可能会被另一个使用 CPU 密集型操作的任务阻塞。相反,我们可以运行后台进程并使用管道、队列和事件在进程之间进行通信。不幸的是,我们无法在进程之间提供复杂的对象,但我们可以提供基本的数据结构来处理后台运行的任务的状态变化。
关于斯塔莱特和后台任务
Starlette 是一个轻量级的 ASGI 框架/工具包,非常适合用 Python 构建异步 Web 服务。 (自述文件说明)
它是基于并发的。因此,即使这也不是适用于所有类型任务的通用解决方案。注意:并发不同于并行。
我想知道我们是否可以构建一些更通用的东西,您可以在脚本或网络服务器中运行后台任务,而不牺牲性能。
上述解决方案建议使用后台进程。尽管如此,这仍然取决于应用程序设计,因为您必须执行运行进程(任务)的通信和同步所需的操作(发出事件、向队列添加指示器等)。没有通用的工具可以实现这一点,但有一些取决于具体情况的解决方案。
情况 1 - 任务是异步函数
假设我们有一个request
函数应该调用 API,而不阻塞其他任务的工作。另外,我们还有一个sleep
不应该阻止任何东西的函数。
import asyncio
import aiohttp
async def request(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
try:
return await response.json()
except aiohttp.ContentTypeError:
return await response.read()
async def sleep(t):
await asyncio.sleep(t)
async def main():
background_task_1 = asyncio.create_task(request("https://google.com/"))
background_task_2 = asyncio.create_task(sleep(5))
... # here we can do even CPU-bound operations
result1 = await background_task_1
... # use the 'result1', etc.
await background_task_2
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
在这种情况下,我们使用asyncio.create_task同时运行协程(就像在后台一样)。当然,我们可以在子进程中运行它,但没有理由这样做,因为它会使用更多资源而不提高性能。
情况 2 - 任务是同步函数(I/O 绑定)
与第一种情况(函数已经是异步的)不同,在这种情况下,这些函数是同步的,但不受 CPU 限制(I/O 限制)。这使得能够在线程中运行它们或使它们异步(使用asyncio.to_thread)并同时运行。
import time
import asyncio
import requests
def asynchronous(func):
"""
This decorator converts a synchronous function to an asynchronous
Usage:
@asynchronous
def sleep(t):
time.sleep(t)
async def main():
await sleep(5)
"""
async def wrapper(*args, **kwargs):
await asyncio.to_thread(func, *args, **kwargs)
return wrapper
@asynchronous
def request(url):
with requests.Session() as session:
response = session.get(url)
try:
return response.json()
except requests.JSONDecodeError:
return response.text
@asynchronous
def sleep(t):
time.sleep(t)
async def main():
background_task_1 = asyncio.create_task(request("https://google.com/"))
background_task_2 = asyncio.create_task(sleep(5))
...
在这里,我们使用装饰器将同步(I/O 绑定)函数转换为异步函数,并像第一种情况一样使用它们。
情况 3 - 任务是同步函数(受 CPU 限制)
要在后台并行运行 CPU 密集型任务,我们必须使用多处理。为了确保任务完成,我们使用join method.
import time
import multiprocessing
def task():
for i in range(10):
time.sleep(0.3)
def main():
background_task = multiprocessing.Process(target=task)
background_task.start()
... # do the rest stuff that does not depend on the background task
background_task.join() # wait until the background task is done
... # do stuff that depends on the background task
if __name__ == "__main__":
main()
假设主应用程序依赖于后台任务的部分。在这种情况下,我们需要一个event驱动设计作为join
不能多次调用。
import multiprocessing
event = multiprocessing.Event()
def task():
... # synchronous operations
event.set() # notify the main function that the first part of the task is done
... # synchronous operations
event.set() # notify the main function that the second part of the task is also done
... # synchronous operations
def main():
background_task = multiprocessing.Process(target=task)
background_task.start()
... # do the rest stuff that does not depend on the background task
event.wait() # wait until the first part of the background task is done
... # do stuff that depends on the first part of the background task
event.wait() # wait until the second part of the background task is done
... # do stuff that depends on the second part of the background task
background_task.join() # wait until the background task is finally done
... # do stuff that depends on the whole background task
if __name__ == "__main__":
main()
正如您已经注意到的那样,对于事件,我们只能提供二进制信息,如果进程超过两个,那么这些信息就无效(不可能知道事件是从哪里发出的)。所以我们使用pipes, queues, and manager在进程之间提供非二进制信息。