假设我有一些异步协程,它可以获取一些数据并返回它。像这样:
async def fetch_data(*args):
result = await some_io()
return result
基本上,这个协程是从协程链中调用的,初始协程是通过创建任务来运行的。
但是,如果出于测试目的,我只想在运行某个文件时以这种方式运行一个协程,该怎么办:
if __name__ == '__main__':
result = await fetch_data(*args)
print(result)
显然我不能这样做,因为我试图从非协程函数运行并等待协程。
所以问题是:是否有一些正确的方法可以从协程获取数据而不调用它的函数?
我可以做一些Future
对象为result
等待它,但也许还有其他一些更简单、更清晰的方法?
您将需要创建一个事件循环来运行您的协程:
import asyncio
async def async_func():
return "hello"
loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_func())
loop.close()
print(result)
或者作为一个函数:
def run_coroutine(f, *args, **kwargs):
loop = asyncio.get_event_loop()
result = loop.run_until_complete(f(*args, **kwargs))
loop.close()
return result
像这样使用:
print(run_coroutine(async_func))
Or:
assert "expected" == run_coroutine(fetch_data, "param1", param2="foo")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)