在 aiohttp Web 服务器中使用 ClientSession 的正确方法是什么?

2024-03-23

我有一个网络服务器,大致如下:

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    # ==========================================
    cards = []
    async for msg in ws: 
        if msg.type == aiohttp.WSMsgType.TEXT:
            Q = json.loads(msg.data)
            if "q" in Q:                                                                                                                
                async with aiohttp.ClientSession(cookies=request.cookies) as session:
                    async with session.get(f"{SEARCH_ROOT}/s", params=Q) as resp:
                        doc_order = await resp.json()
                cards = [   
                    *doc_order["results"],
                    {"done": True},
                ]   
            if len(cards) > 0:
                card = cards.pop(0)
                await ws.send_json(card)
        else:
            ws.close()
    return ws

问题是,这工作得很好,但在大约 13-15 个新的 websocket 连接之后,我开始看到错误,声称could not start a new thread.

文档提到每个服务器实例的客户端会话应该存在一次 https://aiohttp.readthedocs.io/en/stable/faq.html#how-do-i-manage-a-clientsession-within-a-web-server但我不知道该怎么做。我有一些想法:

  • 我只创建一个全局会话对象吗?
  • 我是否必须以某种方式使用后台任务 https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks

我得到的确切回溯是这样的:

Traceback (most recent call last):                                                                                             [58/1849]
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_protocol.py", line 418, in start      
    resp = await task                                                                                                                   
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_app.py", line 458, in _handle         
    resp = await handler(request)                                                                                                       
  File "/home/user/myproject/myproject/api/websocket.py", line 22, in websocket_handler                                                          
    async with aiohttp.request('GET', f"{SEARCH_ROOT}/s", params=Q, cookies=request.cookies) as resp:                                    
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/client.py", line 1043, in __aenter__      
    self._resp = await self._coro                                                                                                       
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/client.py", line 476, in _request         
    timeout=real_timeout                                                                                                                
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/connector.py", line 522, in connect       
    proto = await self._create_connection(req, traces, timeout)                                                                         
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/connector.py", line 854, in _create_connec
tion                                                                                                                                    
    req, traces, timeout)                                                                                                               
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/connector.py", line 955, in _create_direct
_connection                                                                                                                             
    traces=traces), loop=self._loop)                                                                                                    
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/connector.py", line 825, in _resolve_host 
    self._resolver.resolve(host, port, family=self._family)                                                                             
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/resolver.py", line 30, in resolve         
    host, port, type=socket.SOCK_STREAM, family=family)                                                                                 
  File "/usr/lib/python3.6/asyncio/base_events.py", line 681, in getaddrinfo                                                            
    host, port, family, type, proto, flags)
  File "/usr/lib/python3.6/asyncio/base_events.py", line 644, in run_in_executor
    return futures.wrap_future(executor.submit(func, *args), loop=self)
  File "/usr/lib/python3.6/concurrent/futures/thread.py", line 123, in submit
    self._adjust_thread_count()
  File "/usr/lib/python3.6/concurrent/futures/thread.py", line 142, in _adjust_thread_count
    t.start()
  File "/usr/lib/python3.6/threading.py", line 846, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
Unhandled exception
Traceback (most recent call last):
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_protocol.py", line 447, in start
    await resp.prepare(request)
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_response.py", line 353, in prepare
    return await self._start(request)
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_response.py", line 667, in _start
    return await super()._start(request)
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/web_response.py", line 410, in _start
    await writer.write_headers(status_line, headers)
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/http_writer.py", line 112, in write_header
s
    self._write(buf)
  File "/home/user/.local/share/virtualenvs/myproject/lib/python3.6/site-packages/aiohttp/http_writer.py", line 67, in _write
    raise ConnectionResetError('Cannot write to closing transport')
ConnectionResetError: Cannot write to closing transport

这是如何创建单个的最小工作示例aiohttp.ClientSession() per aiohttp服务器实例(又名app).

实例aiohttp.ClientSesssion()创建于app_factory函数[1]。该实例存储在应用程序实例 [2] 中,可以通过以下方式在请求处理程序中访问:request.app['client_session']. aiohttp.ClientSession()使用 Cleanup Context [3] 正确关闭

import argparse
import logging
from typing import Final, NoReturn

import aiohttp
from aiohttp import web
from aiohttp.web_request import Request

logging.basicConfig(level=logging.DEBUG)

routes: Final = web.RouteTableDef()
logger: Final = logging.getLogger(__name__)


@routes.get('/')
async def hello(request: Request) -> web.Response:
    url_to_fetch = "https://httpbin.org/json"

    async with request.app['client_session'].get(url_to_fetch) as response:
        return web.json_response(await response.json())


async def client_session_ctx(app: web.Application) -> NoReturn:
    """
    Cleanup context async generator to create and properly close aiohttp ClientSession

    Ref.:

        > https://docs.aiohttp.org/en/stable/web_advanced.html#cleanup-context
        > https://docs.aiohttp.org/en/stable/web_advanced.html#aiohttp-web-signals
        > https://docs.aiohttp.org/en/stable/web_advanced.html#data-sharing-aka-no-singletons-please
    """
    logger.debug('Creating ClientSession')
    app['client_session'] = aiohttp.ClientSession()

    yield

    logger.debug('Closing ClientSession')
    await app['client_session'].close()


async def app_factory() -> web.Application:
    """
    See: https://docs.aiohttp.org/en/stable/web_advanced.html
    """
    logger.debug('[APP Factory] Creating Application (entering APP Factory)')
    app = web.Application()

    logger.debug('[APP Factory] Adding Routes')
    app.add_routes(routes)

    logger.debug('[APP Factory] Registering Cleanup contexts')
    app.cleanup_ctx.append(client_session_ctx)

    logger.debug('[APP Factory] APP is now prepared and can be returned by APP Factory')
    return app


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--port', type=int, help='Port to run HTTP server', default=8000)

    args = parser.parse_args()
    web.run_app(app_factory(), port=args.port)

[1] https://docs.aiohttp.org/en/stable/web_advanced.html#passing-a-coroutine-into-run-app-and-gunicorn https://docs.aiohttp.org/en/stable/web_advanced.html#passing-a-coroutine-into-run-app-and-gunicorn

[2] https://docs.aiohttp.org/en/stable/web_advanced.html#data-sharing-aka-no-singletons-please https://docs.aiohttp.org/en/stable/web_advanced.html#data-sharing-aka-no-singletons-please

[3] https://docs.aiohttp.org/en/stable/web_advanced.html#cleanup-context https://docs.aiohttp.org/en/stable/web_advanced.html#cleanup-context

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 aiohttp Web 服务器中使用 ClientSession 的正确方法是什么? 的相关文章

随机推荐