Python 3 asyncio 与 aioboto3 似乎是连续的

2023-11-25

我正在将一个简单的 python 3 脚本移植到 AWS Lambda。 该脚本很简单:它从十几个 S3 对象收集信息并返回结果。

使用的脚本multiprocessing.Pool并行收集所有文件。尽管multiprocessing不能在 AWS Lambda 环境中使用,因为/dev/shm不见了。 所以我想与其写脏话multiprocessing.Process / multiprocessing.Queue更换,我会尝试asyncio反而。

我正在使用最新版本aioboto3(8.0.5) 在 Python 3.8 上。

我的问题是,我似乎无法在文件的天真顺序下载和异步事件循环复用下载之间获得任何改进。

这是我的代码的两个版本。

import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import boto3
import aioboto3

BUCKET = 'some-bucket'
KEYS = [
    'some/key/1',
    [...]
    'some/key/10',
]

async def download_aio():
    """Concurrent download of all objects from S3"""
    async with aioboto3.client('s3') as s3:
        objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
        objects = await asyncio.gather(*objects)
        buffers = await asyncio.gather(*[o['Body'].read() for o in objects])

def download():
    """Sequentially download all objects from S3"""
    s3 = boto3.client('s3')
    for key in KEYS:
        object = s3.get_object(Bucket=BUCKET, Key=key)
        object['Body'].read()

def run_sequential():
    download()

def run_concurrent():
    loop = asyncio.get_event_loop()
    #loop.set_default_executor(ProcessPoolExecutor(10))
    #loop.set_default_executor(ThreadPoolExecutor(10))
    loop.run_until_complete(download_aio())

两者的时间安排run_sequential() and run_concurrent()非常相似(十几个 10MB 文件大约需要 3 秒)。 我确信并发版本不是,原因有多种:

  • 我尝试切换到Process/ThreadPoolExecutor,并且我在函数持续时间内生成了进程/线程,尽管它们什么也没做
  • 顺序和并发之间的时序非常接近,尽管我的网络接口绝对没有饱和,并且CPU也没有绑定
  • 并发版本所花费的时间随着文件数量的增加而线性增加。

我确信缺少了一些东西,但我就是无法弄清楚到底缺少什么。

有任何想法吗?


在花费了几个小时试图了解如何使用之后aioboto3正确的是,我决定切换到我的备份解决方案。 我最终推出了我自己的幼稚版本multiprocessing.Pool用于在 AWS lambda 环境中使用。

如果将来有人偶然发现这个线程,就在这里。它远非完美,但很容易更换multiprocessing.Pool对于我的简单案例来说按原样。

from multiprocessing import Process, Pipe
from multiprocessing.connection import wait


class Pool:
    """Naive implementation of a process pool with mp.Pool API.

    This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
    is not mounted in an AWS Lambda environment.
    """

    def __init__(self, process_count=1):
        assert process_count >= 1
        self.process_count = process_count

    @staticmethod
    def wrap_pipe(pipe, index, func):
        def wrapper(args):
            try:
                result = func(args)
            except Exception as exc:  # pylint: disable=broad-except
                result = exc
            pipe.send((index, result))
        return wrapper

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        pass

    def map(self, function, arguments):
        pending = list(enumerate(arguments))
        running = []
        finished = [None] * len(pending)
        while pending or running:
            # Fill the running queue with new jobs
            while len(running) < self.process_count:
                if not pending:
                    break
                index, args = pending.pop(0)
                pipe_parent, pipe_child = Pipe(False)
                process = Process(
                    target=Pool.wrap_pipe(pipe_child, index, function),
                    args=(args, ))
                process.start()
                running.append((index, process, pipe_parent))
            # Wait for jobs to finish
            for pipe in wait(list(map(lambda t: t[2], running))):
                index, result = pipe.recv()
                # Remove the finished job from the running list
                running = list(filter(lambda x: x[0] != index, running))
                # Add the result to the finished list
                finished[index] = result

        return finished
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 3 asyncio 与 aioboto3 似乎是连续的 的相关文章

随机推荐