我想同时使用ThreadPoolExecutor
from concurrent.futures
和异步函数。
我的程序重复向线程池提交具有不同输入值的函数。在该较大函数中执行的最终任务序列可以按任何顺序,并且我不关心返回值,只关心它们在将来的某个时刻执行。
所以我尝试这样做
async def startLoop():
while 1:
for item in clients:
arrayOfFutures.append(await config.threadPool.submit(threadWork, obj))
wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
其中提交的函数是:
async def threadWork(obj):
bool = do_something() # needs to execute before next functions
if bool:
do_a() # can be executed at any time
do_b() # ^
where do_b
and do_a
是异步函数。问题是我收到错误:TypeError: object Future can't be used in 'await' expression
如果我删除等待,我会收到另一个错误,说我需要添加await
.
我想我可以让一切都使用线程,但我真的不想这样做。
我建议仔细阅读 Python 3异步开发指南 https://docs.python.org/3/library/asyncio-dev.html,特别是“并发和多线程”部分。
示例中的主要概念问题是事件循环是单线程的,因此在线程池中执行异步协程没有意义。事件循环和线程交互有几种方式:
-
每个线程的事件循环。例如:
async def threadWorkAsync(obj):
b = do_something()
if b:
# Run a and b as concurrent tasks
task_a = asyncio.create_task(do_a())
task_b = asyncio.create_task(do_b())
await task_a
await task_b
def threadWork(obj):
# Create run loop for this thread and block until completion
asyncio.run(threadWorkAsync())
def startLoop():
while 1:
arrayOfFutures = []
for item in clients:
arrayOfFutures.append(config.threadPool.submit(threadWork, item))
wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
-
在执行器中执行阻塞代码。这允许您使用异步 future 而不是上面的并发 future。
async def startLoop():
while 1:
arrayOfFutures = []
for item in clients:
arrayOfFutures.append(asyncio.run_in_executor(
config.threadPool, threadWork, item))
await asyncio.gather(*arrayOfFutures)
-
使用线程安全函数将任务提交到跨线程的事件循环。例如,您可以在主线程的运行循环中运行所有异步协程,而不是为每个线程创建运行循环:
def threadWork(obj, loop):
b = do_something()
if b:
future_a = asyncio.run_coroutine_threadsafe(do_a(), loop)
future_b = asyncio.run_coroutine_threadsafe(do_b(), loop)
concurrent.futures.wait([future_a, future_b])
async def startLoop():
loop = asyncio.get_running_loop()
while 1:
arrayOfFutures = []
for item in clients:
arrayOfFutures.append(asyncio.run_in_executor(
config.threadPool, threadWork, item, loop))
await asyncio.gather(*arrayOfFutures)
Note:这个示例不应该按字面意思使用,因为它会导致所有协程在主线程中执行,而线程池工作线程只是阻塞。这只是为了展示一个例子run_coroutine_threadsafe()
method.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)