如何使用 python 从包含数千个文件的目录中复制前 100 个文件?


我有一个巨大的目录,并且一直在更新。我试图使用 python 仅列出目录中最新的 100 个文件。我尝试使用 os.listdir(),但是当目录大小接近 1,00,000 个文件时,listdir() 似乎崩溃了(或者我没有等待足够长的时间)。我只需要前 100 个文件(或文件名)进行进一步处理,因此我不希望 listdir() 填充所有 100000 个文件。在Python中有没有好的方法可以做到这一点?



我疯狂地寻找 Windows DLL,它可以让我做 Linux 上所做的事情,但没有成功。

因此,我得出的结论是,唯一的方法是创建自己的 DLL,将这些静态函数公开给我,但后来我想起了 pywintypes。 而且,耶!这已经在那里完成了。而且,更重要的是,迭代器函数已经实现了!凉爽的!

带有 FindFirstFile()、FindNextFile() 和 FindClose() 的 Windows DLL 可能仍在某处,但我没有找到它。所以,我使用了 pywintypes。

编辑: 我发现(很晚)这些函数可以从 kernel32.dll 中获得。一直躲在我的鼻子前面。

抱歉产生依赖性。但我认为您可以从 ...\site-packages\win32 文件夹和最终依赖项中提取 win32file.pyd ,并在必要时将其独立于您的程序的 win32types 进行分发。



NOTE: win32file.FindFilesIterator() returns whole stat of the file/dir, therefore, using my listdir() to get the name and afterwards os.path.get*time() or os.path.is*() doesn't make sense. Better modify my listdir() for those checks.


对您来说坏消息是,它从它喜欢的目录中的第一项开始,您无法选择它是哪一项。在我的测试中它总是返回排序的目录。 (在 Windows 上)

好消息是,您可以在 Windows 上使用通配符来控制要列出的文件。因此,要在不断填充的目录上使用它,您可以使用版本标记新的文件并执行以下操作:

bunch = 1
while True:
    for file in listdir("mydir\\*bunch%i*" % bunch): print file
    sleep(5); bunch += 1


我不知道如果您在循环之间引入延迟,FindFilesIterator() 是否会在新文件到来时继续检测它们。


你总是可以提前创建一个迭代器,然后调用 next() 方法来获取下一个文件:

i = listdir(".")
while True:
    try: name = i.next()
    except StopIteration: sleep(1)
# This probably won't work as imagined though


然而,win32file 为您提供了一些功能,可以帮助您监视目录的更改,我认为这是您最好的选择。

在速度测试中,您还可以看到从此迭代器构造列表比调用 os.listdir() 慢,但 os.listdir() 会阻塞,而我的 listdir() 不会。 无论如何,它的目的不是创建文件列表。我不知道为什么会出现这种速度损失。只能猜测有关 DLL 调用、列表构造、排序或类似内容的内容。 os.listdir() 完全用 C 编写。

你可以在 if 中看到一些用法name=="main“块。将代码保存在listdir.py中并'from listdir import *'它。

Here is the code:

#! /usr/bin/env python

An equivalent of os.listdir() but as a generator using ctypes on 
Unixoides and pywintypes on Windows.

On Linux there is shared object libc.so that contains file manipulation 
functions we need: opendir(), readdir() and closedir().
On Windows those manipulation functions are provided 
by static library header windows.h. As pywintypes is a wrapper around 
this API we will use it.
kernel32.dll contains FindFirstFile(), FindNextFile() and FindClose() as well and they can be used directly via ctypes.

The Unix version of this code is an adaptation of code provided by user
'jason-orendorff' on Stack Overflow answering a question by user 'adrien'.
The original URL is:

The Unix code is tested on Raspbian for now and it works. A reasonable 
conclusion is that it'll work on all Debian based distros as well.

NOTE: dirent structure is not the same on all distros, so the code will break on some of them.

The code is also tested on Cygwin using cygwin1.dll and it 
doesn't work.

If platform isn't Windows or Posix environment, listdir will be 
redirected back to os.listdir().

NOTE: There is scandir module implementing this code with no dependencies, excellent error handling and portability. I found it only after putting together this code. scandir() is now included in standardlib of Python 3.5 as os.scandir().
You definitely should use scandir, not this code.
Scandir module is available on pypi.python.org.

import sys, os

__all__ = ["listdir"]

if sys.platform.startswith("win"):
    from win32file import FindFilesIterator

    def listdir (path):
        A generator to return the names of files in the directory passed in
        if "*" not in path and "?" not in path:
            st = os.stat(path) # Raise an error if dir doesn't exist or access is denied to us
            # Check if we got a dir or something else!
            # Check gotten from stat.py (for fast checking):
            if (st.st_mode & 0170000) != 0040000:
                e = OSError()
                e.errno = 20; e.filename = path; e.strerror = "Not a directory"
                raise e
            path = path.rstrip("\\/")+"\\*"
        # Else:  Decide that user knows what she/he is doing
        for file in FindFilesIterator(path):
            name = file[-2]
            # Unfortunately, only drives (eg. C:) don't include "." and ".." in the list:
            if name=="." or name=="..": continue
            yield name

elif os.name=="posix":
    if not sys.platform.startswith("linux"):
        print >> sys.stderr, "WARNING: Environment is Unix but platform is '"+sys.platform+"'\nlistdir() may not work properly."
    from ctypes import CDLL, c_char_p, c_int, c_long, c_ushort, c_byte, c_char, Structure, POINTER
    from ctypes.util import find_library

    class c_dir(Structure):
        """Opaque type for directory entries, corresponds to struct DIR"""

    c_dir_p = POINTER(c_dir)

    class c_dirent(Structure):
        """Directory entry"""
        # FIXME not sure these are the exactly correct types!
        _fields_ = (
            ('d_ino', c_long), # inode number
            ('d_off', c_long), # offset to the next dirent
            ('d_reclen', c_ushort), # length of this record
            ('d_type', c_byte), # type of file; not supported by all file system types
            ('d_name', c_char * 4096) # filename

    c_dirent_p = POINTER(c_dirent)

    c_lib = CDLL(find_library("c"))
    # Extract functions:
    opendir = c_lib.opendir
    opendir.argtypes = [c_char_p]
    opendir.restype = c_dir_p

    readdir = c_lib.readdir
    readdir.argtypes = [c_dir_p]
    readdir.restype = c_dirent_p

    closedir = c_lib.closedir
    closedir.argtypes = [c_dir_p]
    closedir.restype = c_int

    def listdir(path):
        A generator to return the names of files in the directory passed in
        st = os.stat(path) # Raise an error if path doesn't exist or we don't have permission to access it
        # Check if we got a dir or something else!
        # Check gotten from stat.py (for fast checking):
        if (st.st_mode & 0170000) != 0040000:
            e = OSError()
            e.errno = 20; e.filename = path; e.strerror = "Not a directory"
            raise e
        dir_p = opendir(path)
            while True:
                p = readdir(dir_p)
                if not p: break # End of directory
                name = p.contents.d_name
                if name!="." and name!="..": yield name
        finally: closedir(dir_p)

    print >> sys.stderr, "WARNING: Platform is '"+sys.platform+"'!\nFalling back to os.listdir(), iterator generator will not be returned!"
    listdir = os.listdir

if __name__ == "__main__":
    if len(sys.argv)!=1:
        try: limit = int(sys.argv[2])
        except: limit = -1
        count = 0
        for name in listdir(sys.argv[1]):
            if count==limit: break
            count += 1
            print repr(name),
        print "\nListed", count, "items from directory '%s'" % sys.argv[1]
    if len(sys.argv)!=1: sys.exit()
    from timeit import *
    print "Speed test:"
    dir = ("/etc", r"C:\WINDOWS\system32")[sys.platform.startswith("win")]
    t = Timer("l = listdir(%s)" % repr(dir), "from listdir import listdir")
    print "Measuring time required to create an iterator to list a directory:"
    time = t.timeit(200)
    print "Time required to return a generator for directory '"+dir+"' is", time, "seconds measured through 200 passes"
    t = Timer("l = os.listdir(%s)" % repr(dir), "import os")
    print "Measuring time required to create a list of directory in advance using os.listdir():"
    time = t.timeit(200)
    print "Time required to return a list for directory '"+dir+"' is", time, "seconds measured through 200 passes"
    t = Timer("l = []\nfor file in listdir(%s): l.append(file)" % repr(dir), "from listdir import listdir")
    print "Measuring time needed to create a list of directory using our listdir() instead of os.listdir():"
    time = t.timeit(200)
    print "Time required to create a list for directory '"+dir+"' using our listdir() instead of os.listdir() is", time, "seconds measured through 200 passes"


