深度复制嵌套可迭代(或改进的 itertools.tee 用于可迭代的可迭代)

2024-03-13

Preface

我有一个测试,我正在使用嵌套迭代(通过嵌套迭代我的意思是仅可迭代作为元素)。

作为测试级联考虑

from itertools import tee
from typing import (Any,
                    Iterable)


def foo(nested_iterable: Iterable[Iterable[Any]]) -> Any:
    ...


def test_foo(nested_iterable: Iterable[Iterable[Any]]) -> None:
    original, target = tee(nested_iterable)  # this doesn't copy iterators elements

    result = foo(target)

    assert is_contract_satisfied(result, original)


def is_contract_satisfied(result: Any,
                          original: Iterable[Iterable[Any]]) -> bool:
    ...

E.g. foo可能是简单的恒等函数

def foo(nested_iterable: Iterable[Iterable[Any]]) -> Iterable[Iterable[Any]]:
    return nested_iterable

契约只是检查扁平化的迭代是否具有相同的元素

from itertools import (chain,
                       starmap,
                       zip_longest)
from operator import eq
...
flatten = chain.from_iterable


def is_contract_satisfied(result: Iterable[Iterable[Any]],
                          original: Iterable[Iterable[Any]]) -> bool:
    return all(starmap(eq,
                       zip_longest(flatten(result), flatten(original),
                                   # we're assuming that ``object()``
                                   # will create some unique object
                                   # not presented in any of arguments
                                   fillvalue=object())))

但如果其中一些nested_iterableelements 是一个迭代器,它可能会被耗尽,因为tee正在制作浅拷贝,而不是深拷贝,即对于给定的foo and is_contract_satisfied下一个声明

>>> test_foo([iter(range(10))])

导致可预测的

Traceback (most recent call last):
  ...
    test_foo([iter(range(10))])
  File "...", line 19, in test_foo
    assert is_contract_satisfied(result, original)
AssertionError

Problem

如何深度复制任意嵌套的可迭代对象?

Note

我知道copy.deepcopy功能 https://docs.python.org/3/library/copy.html#copy.deepcopy,但它不适用于文件对象。


朴素的解决方案

简单的算法是

  1. 对原始嵌套可迭代执行按元素复制。
  2. Make n元素复制的副本。
  3. 获取与每个独立副本相关的坐标。

这可以像这样实现

from itertools import tee
from operator import itemgetter
from typing import (Any,
                    Iterable,
                    Tuple,
                    TypeVar)

Domain = TypeVar('Domain')


def copy_nested_iterable(nested_iterable: Iterable[Iterable[Domain]],
                         *,
                         count: int = 2
                         ) -> Tuple[Iterable[Iterable[Domain]], ...]:
    def shallow_copy(iterable: Iterable[Domain]) -> Tuple[Iterable[Domain], ...]:
        return tee(iterable, count)

    copies = shallow_copy(map(shallow_copy, nested_iterable))
    return tuple(map(itemgetter(index), iterables)
                 for index, iterables in enumerate(copies))

Pros:

  • 很容易阅读和解释。

Cons:

  • 如果我们想以更高的嵌套级别扩展我们的可迭代方法(例如嵌套可迭代的可迭代等等),那么这种方法看起来没有帮助。

我们可以做得更好。

改进的解决方案

如果我们看一下itertools.tee功能文档 https://docs.python.org/3/library/itertools.html#itertools.tee,它包含Python配方,在以下帮助下functools.singledispatch装饰者 https://docs.python.org/3/library/functools.html#functools.singledispatch可以重写为

from collections import (abc,
                         deque)
from functools import singledispatch
from itertools import repeat
from typing import (Iterable,
                    Tuple,
                    TypeVar)

Domain = TypeVar('Domain')


@functools.singledispatch
def copy(object_: Domain,
         *,
         count: int) -> Iterable[Domain]:
    raise TypeError('Unsupported object type: {type}.'
                    .format(type=type(object_)))

# handle general case
@copy.register(object)
# immutable strings represent a special kind of iterables
# that can be copied by simply repeating
@copy.register(bytes)
@copy.register(str)
# mappings cannot be copied as other iterables
# since they are iterable only by key
@copy.register(abc.Mapping)
def copy_object(object_: Domain,
                *,
                count: int) -> Iterable[Domain]:
    return itertools.repeat(object_, count)


@copy.register(abc.Iterable)
def copy_iterable(object_: Iterable[Domain],
                  *,
                  count: int = 2) -> Tuple[Iterable[Domain], ...]:
    iterator = iter(object_)
    # we are using `itertools.repeat` instead of `range` here
    # due to efficiency of the former
    # more info at
    # https://stackoverflow.com/questions/9059173/what-is-the-purpose-in-pythons-itertools-repeat/9098860#9098860
    queues = [deque() for _ in repeat(None, count)]

    def replica(queue: deque) -> Iterable[Domain]:
        while True:
            if not queue:
                try:
                    element = next(iterator)
                except StopIteration:
                    return
                element_copies = copy(element,
                                           count=count)
                for sub_queue, element_copy in zip(queues, element_copies):
                    sub_queue.append(element_copy)
            yield queue.popleft()

    return tuple(replica(queue) for queue in queues)

Pros:

  • 处理更深层次的嵌套,甚至是同一级别上的可迭代元素和不可迭代元素等混合元素,
  • 可以扩展为用户定义的结构(例如,用于制作它们的独立深层副本)。

Cons:

  • 可读性较差(但据我们所知“实用性胜过纯粹性” https://www.python.org/dev/peps/pep-0020/#the-zen-of-python),
  • 提供了一些与调度相关的开销(但没关系,因为它基于字典查找,其中有O(1)复杂)。

Test

准备

让我们定义嵌套迭代如下

nested_iterable = [range(10 ** index) for index in range(1, 7)]

由于迭代器的创建没有提及底层副本的性能,因此让我们定义迭代器耗尽的函数(描述here https://mail.python.org/pipermail/python-ideas/2013-September/023488.html)

exhaust_iterable = deque(maxlen=0).extend

Time

Using timeit package

import timeit

def naive(): exhaust_iterable(copy_nested_iterable(nested_iterable))

def improved(): exhaust_iterable(copy_iterable(nested_iterable))

print('naive approach:', min(timeit.repeat(naive)))
print('improved approach:', min(timeit.repeat(improved)))

我的笔记本电脑运行 Windows 10 x64,运行 Python 3.5.4

naive approach: 5.1863865
improved approach: 3.5602296000000013

Memory

Using memory_profiler package https://pypi.org/project/memory-profiler/

Line #    Mem usage    Increment   Line Contents
================================================
    78     17.2 MiB     17.2 MiB   @profile
    79                             def profile_memory(nested_iterable: Iterable[Iterable[Any]]) -> None:
    80     68.6 MiB     51.4 MiB       result = list(flatten(flatten(copy_nested_iterable(nested_iterable))))

对于“天真的”方法和

Line #    Mem usage    Increment   Line Contents
================================================
    78     17.2 MiB     17.2 MiB   @profile
    79                             def profile_memory(nested_iterable: Iterable[Iterable[Any]]) -> None:
    80     68.7 MiB     51.4 MiB       result = list(flatten(flatten(copy_iterable(nested_iterable))))

为“改进”之一。

Note:我已经运行了不同的脚本,因为立即运行它们不会具有代表性,因为第二条语句将重用之前在后台创建的内容int对象。


结论

正如我们所看到的,这两个函数具有相似的性能,但最后一个函数支持更深层次的嵌套,并且看起来非常可扩展。

广告

我添加了“改进”的解决方案lz package https://pypi.org/project/lz from 0.4.0可以像这样使用的版本

>>> from lz.replication import replicate
>>> iterable = iter(range(5))
>>> list(map(list, replicate(iterable,
                             count=3)))
[[0, 1, 2, 3, 4], [0, 1, 2, 3, 4], [0, 1, 2, 3, 4]]

它是基于属性的测试,使用hypothesis框架 https://hypothesis.works/,所以我们可以确定它按预期工作。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

深度复制嵌套可迭代(或改进的 itertools.tee 用于可迭代的可迭代) 的相关文章

随机推荐