AIORedis 和 PUB/SUB 不是 asnyc

2024-04-29

I used aioredis http://aioredis.readthedocs.org/en/latest/examples.html用于编写异步服务,该服务将侦听某个通道并以异步方式运行一些命令。

基本上我从示例页面 http://aioredis.readthedocs.org/en/latest/examples.html编写一个小型测试应用程序并删除不必要的部分:

import asyncio
import aioredis

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        print('Got Message:', msg)
        i = int(msg['sleep_for'])
        print('Sleep for {}'.format(i))
        await asyncio.sleep(i)
        print('End sleep')


async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

还有另一个测试应用程序,它发布带有sleep_for然后在订阅者应用程序中使用该字段来模拟内部的某些工作reader协程使用sleep陈述。

我期望“睡眠”以“并行”方式运行,但实际上它们以同步方式出现在屏幕上,只是一个接一个。

我的猜测是,一旦击中await ch.get_json(..)(或者甚至可能await ch.wait_message())行我应该能够处理下一条消息。在实践中,它像同步代码一样运行。我哪里错了?这可以使用连接池来处理,但这意味着有些东西不是异步的,并且不知道到底是什么。


我的猜测是,一旦点击await ch.get_json(..)(或者甚至await ch.wait_message())行,我应该能够处理下一条消息。

事情不是这样的async/await语法有效。每次你击中一个await在协程中,该协程将被“暂停”,将控制权交给被调用的协程。如果它正在睡眠,它不会自动处理下一条消息。

你应该做的是使用ensure_future在单独的协程中处理每条消息:

import asyncio
import aioredis

async def handle_msg(msg):
    print('Got Message:', msg)
    i = int(msg['sleep_for'])
    print('Sleep for {}'.format(i))
    await asyncio.sleep(i)
    print('End sleep')

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        asyncio.ensure_future(handle_msg(msg))

async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close() 
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

AIORedis 和 PUB/SUB 不是 asnyc 的相关文章

随机推荐