Python 多处理:全局对象未正确复制到子级

2024-05-25

前几天我回答了一个关于SO的问题 https://stackoverflow.com/q/67047533/1925388关于并行读取 tar 文件。

这是问题的要点:

import bz2
import tarfile
from multiprocessing import Pool

tr = tarfile.open('data.tar')

def clean_file(tar_file_entry):
    if '.bz2' not in str(tar_file_entry):
        return
    with tr.extractfile(tar_file_entry) as bz2_file:
        with bz2.open(bz2_file, "rt") as bzinput:
            # Reading bz2 file
            ....
            .... 


def process_serial():
    members = tr.getmembers()
    processed_files = []
    for i, member in enumerate(members):
        processed_files.append(clean_file(member))
        print(f'done {i}/{len(members)}')


def process_parallel():
    members = tr.getmembers()
    with Pool() as pool:
        processed_files = pool.map(clean_file, members)
        print(processed_files)


def main():
    process_serial() # No error
    process_parallel() # Error


if __name__ == '__main__':
    main()

我们只需在子进程中而不是在父进程中打开 tar 文件,就可以使错误消失,如中所述答案 https://stackoverflow.com/a/67080067/1925388.

我无法理解为什么这有效。

即使我们在父进程中打开 tarfile,子进程也会得到一个新的副本。 那么为什么在子进程中打开 tarfile 会显式地产生任何影响呢?

这是否意味着在第一种情况下,子进程以某种方式改变了公共 tarfile 对象并由于并发写入而导致内存损坏?


FWIW,评论中的答案open在类 UNIX 系统上关于文件句柄号实际上是不正确的。

If multiprocessing uses fork()(它在 Linux 和类似的情况下执行此操作,尽管我读到在 macOS 上分叉存在问题),文件句柄和其他所有内容都愉快地复制到子进程(“愉快”我的意思是在许多边缘情况下它很复杂,例如分叉线程,但对于文件句柄仍然可以正常工作)。

以下对我来说效果很好:

import multiprocessing

this = open(__file__, 'r')


def read_file():
    print(len(this.read()))


def main():
    process = multiprocessing.Process(target=read_file)
    process.start()
    process.join()


if __name__ == '__main__':
    main()

问题很可能是tarfile在读取时具有内部结构和/或缓冲,您也可以通过尝试同时查找和读取同一存档的不同部分来简单地遇到冲突。也就是说,我推测在这种情况下使用没有任何同步的线程池可能会遇到完全相同的问题。

Edit:澄清一下,从 Tar 存档中提取文件是likely(我没有检查具体细节)如下:(1)寻找封装部分(文件)的偏移量,(2)读取封装文件的块,将块写入目标文件(或管道) ,或 w/e), (3) 重复 (2),直到提取整个文件。

通过尝试使用相同文件句柄从并行进程中以非同步方式执行此操作,可能会导致这些步骤的混合,即开始处理文件#2 将远离文件#1,而我们处于中间读取文件 #1 等

Edit2回答下面的评论:内存表示是为子进程重新分叉的,这是真的;但内核端管理的资源(例如文件句柄和内核缓冲区)是共享的。

为了显示:

import multiprocessing

this = open(__file__, 'rb')


def read_file(worker):
    print(worker, this.read(80))


def main():
    processes = []

    for number in (1, 2):
        processes.append(
            multiprocessing.Process(target=read_file, args=(number,)))

    for process in processes:
        process.start()
    for process in processes:
        process.join()


if __name__ == '__main__':
    main()

在 Linux 上运行这个我得到:

$ python3.8 test.py 
1 b"import multiprocessing\n\nthis = open(__file__, 'rb')\n\n\ndef read_file(worker):\n   "
2 b''

如果查找和读取是独立的,则两个进程将打印相同的结果,但事实并非如此。由于这是一个小文件,并且 Python 选择缓冲少量数据(8 KiB),因此第一个进程会读取到 EOF,而第二个进程则没有剩余数据可供读取(当然,除非它回溯)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 多处理:全局对象未正确复制到子级 的相关文章

随机推荐