如何测量异步发电机所花费的时间?

2024-05-07

我想测量生成器花费的时间(阻塞主循环的时间)。

假设我有以下两个生成器:

async def run():
    for i in range(5):
        await asyncio.sleep(0.2)
        yield i
    return

async def walk():
    for i in range(5):
        time.sleep(0.3)
        yield i
    return

我想测量一下run花费了周围0.0s每次迭代,同时walk至少使用过0.3s.

我想用类似的东西this https://stackoverflow.com/questions/73028924/how-to-measure-time-spent-in-blocking-code-while-using-asyncio-in-python,但无法让它为我工作。

澄清:

我想排除花在任何事情上的时间await部分。如果由于任何原因协程被停止,那么我不想考虑这个时间。


所以 - 一件事(虽然有点)是测量常规协同例程中的时间 - 我用装饰器来完成它。

然而,当你进一步进入异步生成器时,它是另一个野兽 - 我仍在试图弄清楚 - 它是公开公开的异步迭代器方法的混合(__anext__, asend等...)在返回的对象中使用传统的迭代器方法,我还无法弄清楚(我刚刚打开 PEP 525 看看我是否能理解它)。

至于常规的协同例程,有一个问题:如果你创建一个可等待的类(我的装饰器),asyncio 将要求其__await__方法返回带有 a 的内容__next__方法,该方法将被调用。但原生 Python 协程没有__next__:异步调用send()在这些上 - 所以我必须进行“转置”以便能够测量时间(在协同例程中)。

import asyncio
import time


class ProfileCoro:
    def __init__(self, corofunc):
        self.corofunc = corofunc

    def __call__(self, *args, **kw):
        # WARNING: not parallel execution-safe: fix by
        # keeping "self.ellapsed" in a proper contextvar
        self.ellapsed = 0
        self.coro = self.corofunc(*args, **kw)
        return self

    def __await__(self):
        return self

    def __iter__(self):
        return self

    def __next__(self):
        return self.send(None)

    def throw(self, exc):
        print(f"Arghh!, got an {exc}")
        self.coro.throw(exc)

    def send(self, arg):
        start = time.monotonic()
        try:
            result = self.coro.send(arg)
        except StopIteration:
            duration = time.monotonic() - start
            self.ellapsed += duration
            print(f"Ellapsed time in execution of {self.corofunc.__name__}: {self.ellapsed:.05f}s")
            raise
        duration = time.monotonic() - start
        self.ellapsed += duration
        return result

    def __repr__(self):
        return f"<ProfileCoro wrapper for {self.corofunc}>"

@ProfileCoro
async def run():
    for i in range(5):
        await asyncio.sleep(0.2)
        # yield i
    return 1

@ProfileCoro
async def walk():
    for i in range(5):
        time.sleep(0.3)
        #yield i
    return 3


async def main():
    await run()
    await walk()
    return

asyncio.run(main())

To maybe如果我能弄清楚如何包装异步生成器,请继续。

(我认为大多数现有的分析工具都使用该语言中可用的工具来进行调试器和跟踪(通过sys.settrace():一切都在回调中“可见”,并且不用担心包装异步机制和异步循环发出的所有内部调用)

... 因此,这里是用于捕获异步生成器中的时间的代码。

它将得到幸福的道路 - 如果有复杂的等待类,实现或利用asend, athrow,这不行 - 但对于一个简单的异步生成器函数插件来说async for声明它现在有效:

免责声明:下面的代码中可能有未使用的代码,甚至未使用的状态 - 我来回了相当多的时间来让它工作(其中很多是因为我没有尝试过这一事实)__anext__本身必须是异步的)。尽管如此,事情还是这样:

import asyncio
import time

from functools import wraps


async def _a():
    yield 1

async_generator_asend_type = type(_a().__anext__())


class ProfileCoro:
    def __init__(self, corofunc):
        self.corofunc = corofunc
        self.measures = 0

    def measure_ellapsed(func):
        @wraps(func)
        def wrapper(self, *args, **kw):
            self.measures += 1
            if self.measures > 1:
                try:
                    return func(self, *args, **kw)
                finally:
                    self.measures -= 1
            start = time.monotonic()
            try:
                result = func(self, *args, **kw)
            except (StopIteration, StopAsyncIteration):
                self.ellapsed += time.monotonic() - start
                #self.print_ellapsed()
                raise
            finally:
                self.measures -= 1
            self.ellapsed += time.monotonic() - start

            return result
        return wrapper

    def print_ellapsed(self):
        name = getattr(self.corofunc, "__name__", "inner_iterator")
        print(f"Ellapsed time in execution of {name}: {self.ellapsed:.05f}s")

    def __call__(self, *args, **kw):
        # WARNING: not parallel execution-safe: fix by
        # keeping "self.ellapsed" in a proper contextvar
        self.ellapsed = 0
        self.measures = 0
        if not isinstance(self.corofunc, async_generator_asend_type):
            self.coro = self.corofunc(*args, **kw)
        else:
            self.coro = self.corofunc
        return self


    def __await__(self):
        return self

    def __iter__(self):
        return self

    @measure_ellapsed
    def __next__(self):
        target = self.coro
        if hasattr(target, "__next__"):
            return target.__next__()
        elif hasattr(target, "send"):
            return target.send(None)

    async def athrow(self, exc):
        print(f"Arghh!, got an async-iter-mode {exc}")
        return await self.async_iter.athrow(exc)

    def throw(self, exc):
        print(f"Arghh!, got an {exc}")
        self.coro.throw(exc)

    @measure_ellapsed
    def send(self, arg):
        return self.coro.send(arg)

    def __aiter__(self):
        return self

    #async def asend(self, value):
        ...

    async def aclose(self):
        return await self.async_iter.close()

    def close(self):
        return self.async_iter.close()

    async def __anext__(self):
        if not hasattr(self, "async_iter"):
            self.async_iter = aiter(self.coro)
        self.inner_profiler = ProfileCoro(self.async_iter.__anext__())
        #start = time.monotonic()
        try:
            result = await self.inner_profiler()
        except StopAsyncIteration:
            #self.print_ellapsed()
            raise
        finally:
            self.ellapsed += self.inner_profiler.ellapsed
        return result

    def __repr__(self):
        return f"<ProfileCoro wrapper for {self.corofunc}>"

@ProfileCoro
async def run():
    for i in range(5):
        await asyncio.sleep(0.05)
        # yield i
    return 1

@ProfileCoro
async def walk():
    for i in range(5):
        time.sleep(0.05)
        #yield i
    return 3


@ProfileCoro
async def irun():
    for i in range(5):
        await asyncio.sleep(0.05)
        yield i

@ProfileCoro
async def iwalk():
    for i in range(5):
        time.sleep(0.05)
        yield i


async def main():
    await run()
    run.print_ellapsed()
    await walk()
    walk.print_ellapsed()
    async for _ in irun():
        print(".", end="", flush=True)
    irun.print_ellapsed()
    async for _ in iwalk():
        pass
    iwalk.print_ellapsed()
    return

asyncio.run(main())

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何测量异步发电机所花费的时间? 的相关文章