4

我编写了一个简单的(测试)脚本来列出选定目录中的文件。不使用FindFirstFile;仅本机 API。当我执行脚本并观察时,Win32API 监视器告诉我 STATUS_SUCCESS。我的文件信息缓冲区是c_buffer(1024),不使用 Unicode 缓冲区来查看原始数据。

所以打电话后NtQueryDirectoryFile一切正常。当我c_buffer以原始模式写入控制台以查看目录中的文件时,输出不是结构化的。我创建了一个 FILE_DIRECTORY_INFORMATION 结构,但它不能在 Windows 7 X86 上运行,或者我的代码有问题。

我的问题:请告诉我在 Windows 7 X86 或任何变体上使用哪个 FILE_DIRECTORY_INFORMATION 结构

from ctypes import *

hFile = windll.kernel32.CreateFileW("C:\\a",0x80000000,0,0,3,0x02000000,0)

class Info(Union):
    _fields_ = [('STATUS',c_long),
                ('Pointer',c_ulong),]


class io_stat(Structure):
    _fields_ = [('Stat',Info),
                ('Information',c_ulong),]


class FILE_OBJECT(Structure):
    _fields_ = [('Next',c_ulong),
                ('FileIndex',c_ulong),
                ('ctime',c_longlong),
                ('lat',c_longlong),
                ('wtime',c_longlong),
                ('ch',c_longlong),
                ('Endogfile',c_longlong),
                ('allo',c_longlong),
                ('Fileattr',c_ulong),
                ('Filenalen',c_ulong),
                ('Filename',c_wchar * 2),]

b = io_stat()
a = c_buffer(1024)

windll.ntdll.NtQueryDirectoryFile(hFile,0,0,0,byref(b),byref(a),sizeof(a), 1,0,None,0)

print(a.raw)

未优化。

4

1 回答 1

8

NtQueryDirectoryFile应该在循环中调用,直到它返回STATUS_NO_MORE_FILES。如果返回的状态是STATUS_BUFFER_OVERFLOW或状态成功(非负)且状态块Information为 0,则将缓冲区大小加倍并重试。对于每个成功的传递,将记录复制FILE_DIRECTORY_INFORMATION出缓冲区。每条记录的大小都必须包含FileName. Next当字段为 0时,您已到达终点。

下面的示例子类FILE_DIRECTORY_INFORMATION化为一个DirEntry类,该类具有一个listbuf类方法来列出查询缓冲区中的记录。它跳过“。” 和“..”条目。ntlistdir它在一个函数中使用这个类,该函数通过 列出DirEntry给定目录的记录NtQueryDirectoryFile。它支持将打开的文件描述符作为path参数传递,这就像os.listdir在 POSIX 系统上的工作方式一样。

ctypes 定义

import os
import msvcrt
import ctypes

from ctypes import wintypes

ntdll = ctypes.WinDLL('ntdll')
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)

def NtError(status):
    err = ntdll.RtlNtStatusToDosError(status)
    return ctypes.WinError(err)

NTSTATUS = wintypes.LONG
STATUS_BUFFER_OVERFLOW = NTSTATUS(0x80000005).value
STATUS_NO_MORE_FILES = NTSTATUS(0x80000006).value
STATUS_INFO_LENGTH_MISMATCH = NTSTATUS(0xC0000004).value

ERROR_DIRECTORY = 0x010B
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
GENERIC_READ = 0x80000000
FILE_SHARE_READ = 1
OPEN_EXISTING = 3
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
FILE_ATTRIBUTE_DIRECTORY = 0x0010

FILE_INFORMATION_CLASS = wintypes.ULONG
FileDirectoryInformation = 1
FileBasicInformation = 4

LPSECURITY_ATTRIBUTES = wintypes.LPVOID
PIO_APC_ROUTINE = wintypes.LPVOID
ULONG_PTR = wintypes.WPARAM

class UNICODE_STRING(ctypes.Structure):
    _fields_ = (('Length',        wintypes.USHORT),
                ('MaximumLength', wintypes.USHORT),
                ('Buffer',        wintypes.LPWSTR))

PUNICODE_STRING = ctypes.POINTER(UNICODE_STRING)

class IO_STATUS_BLOCK(ctypes.Structure):
    class _STATUS(ctypes.Union):
        _fields_ = (('Status',  NTSTATUS),
                    ('Pointer', wintypes.LPVOID))
    _anonymous_ = '_Status',
    _fields_ = (('_Status',     _STATUS),
                ('Information', ULONG_PTR))

PIO_STATUS_BLOCK = ctypes.POINTER(IO_STATUS_BLOCK)

ntdll.NtQueryInformationFile.restype = NTSTATUS
ntdll.NtQueryInformationFile.argtypes = (
    wintypes.HANDLE,        # In  FileHandle
    PIO_STATUS_BLOCK,       # Out IoStatusBlock
    wintypes.LPVOID,        # Out FileInformation
    wintypes.ULONG,         # In  Length
    FILE_INFORMATION_CLASS) # In  FileInformationClass

ntdll.NtQueryDirectoryFile.restype = NTSTATUS
ntdll.NtQueryDirectoryFile.argtypes = (
    wintypes.HANDLE,        # In     FileHandle
    wintypes.HANDLE,        # In_opt Event
    PIO_APC_ROUTINE,        # In_opt ApcRoutine
    wintypes.LPVOID,        # In_opt ApcContext
    PIO_STATUS_BLOCK,       # Out    IoStatusBlock
    wintypes.LPVOID,        # Out    FileInformation
    wintypes.ULONG,         # In     Length
    FILE_INFORMATION_CLASS, # In     FileInformationClass
    wintypes.BOOLEAN,       # In     ReturnSingleEntry
    PUNICODE_STRING,        # In_opt FileName
    wintypes.BOOLEAN)       # In     RestartScan

kernel32.CreateFileW.restype = wintypes.HANDLE
kernel32.CreateFileW.argtypes = (
    wintypes.LPCWSTR,      # In     lpFileName
    wintypes.DWORD,        # In     dwDesiredAccess
    wintypes.DWORD,        # In     dwShareMode
    LPSECURITY_ATTRIBUTES, # In_opt lpSecurityAttributes
    wintypes.DWORD,        # In     dwCreationDisposition
    wintypes.DWORD,        # In     dwFlagsAndAttributes
    wintypes.HANDLE)       # In_opt hTemplateFile

class FILE_BASIC_INFORMATION(ctypes.Structure):
    _fields_ = (('CreationTime',   wintypes.LARGE_INTEGER),
                ('LastAccessTime', wintypes.LARGE_INTEGER),
                ('LastWriteTime',  wintypes.LARGE_INTEGER),
                ('ChangeTime',     wintypes.LARGE_INTEGER),
                ('FileAttributes', wintypes.ULONG))

class FILE_DIRECTORY_INFORMATION(ctypes.Structure):
    _fields_ = (('_Next',          wintypes.ULONG),
                ('FileIndex',      wintypes.ULONG),
                ('CreationTime',   wintypes.LARGE_INTEGER),
                ('LastAccessTime', wintypes.LARGE_INTEGER),
                ('LastWriteTime',  wintypes.LARGE_INTEGER),
                ('ChangeTime',     wintypes.LARGE_INTEGER),
                ('EndOfFile',      wintypes.LARGE_INTEGER),
                ('AllocationSize', wintypes.LARGE_INTEGER),
                ('FileAttributes', wintypes.ULONG),
                ('FileNameLength', wintypes.ULONG),
                ('_FileName',      wintypes.WCHAR * 1))

    @property
    def FileName(self):
        addr = ctypes.addressof(self) + type(self)._FileName.offset
        size = self.FileNameLength // ctypes.sizeof(wintypes.WCHAR)
        return (wintypes.WCHAR * size).from_address(addr).value

DirEntryntlistdir

class DirEntry(FILE_DIRECTORY_INFORMATION):
    def __repr__(self):
        return '<{} {!r}>'.format(self.__class__.__name__, self.FileName)

    @classmethod
    def listbuf(cls, buf):
        result = []
        base_size = ctypes.sizeof(cls) - ctypes.sizeof(wintypes.WCHAR)
        offset = 0
        while True:
            fdi = cls.from_buffer(buf, offset)
            if fdi.FileNameLength and fdi.FileName not in ('.', '..'):
                cfdi = cls()
                size = base_size + fdi.FileNameLength
                ctypes.resize(cfdi, size)
                ctypes.memmove(ctypes.byref(cfdi), ctypes.byref(fdi), size)
                result.append(cfdi)
            if fdi._Next:
                offset += fdi._Next
            else:
                break
        return result

def isdir(path):
    if not isinstance(path, int):
        return os.path.isdir(path)
    try:
        hFile = msvcrt.get_osfhandle(path)
    except IOError:
        return False
    iosb = IO_STATUS_BLOCK()
    info = FILE_BASIC_INFORMATION()
    status = ntdll.NtQueryInformationFile(hFile, ctypes.byref(iosb),
                ctypes.byref(info), ctypes.sizeof(info),
                FileBasicInformation)
    return bool(status >= 0 and info.FileAttributes & FILE_ATTRIBUTE_DIRECTORY)

def ntlistdir(path=None):
    result = []

    if path is None:
        path = os.getcwd()

    if isinstance(path, int):
        close = False
        fd = path
        hFile = msvcrt.get_osfhandle(fd)
    else:
        close = True
        hFile = kernel32.CreateFileW(path, GENERIC_READ, FILE_SHARE_READ,
                    None, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS, None)
        if hFile == INVALID_HANDLE_VALUE:
            raise ctypes.WinError(ctypes.get_last_error())
        fd = msvcrt.open_osfhandle(hFile, os.O_RDONLY)

    try:
        if not isdir(fd):
            raise ctypes.WinError(ERROR_DIRECTORY)
        iosb = IO_STATUS_BLOCK()
        info = (ctypes.c_char * 4096)()
        while True:
            status = ntdll.NtQueryDirectoryFile(hFile, None, None, None,
                        ctypes.byref(iosb), ctypes.byref(info),
                        ctypes.sizeof(info), FileDirectoryInformation,
                        False, None, False)
            if (status == STATUS_BUFFER_OVERFLOW or
                iosb.Information == 0 and status >= 0):
                info = (ctypes.c_char * (ctypes.sizeof(info) * 2))()
            elif status == STATUS_NO_MORE_FILES:
                break
            elif status >= 0:
                sublist = DirEntry.listbuf(info)
                result.extend(sublist)
            else:
                raise NtError(status)
    finally:
        if close:
            os.close(fd)

    return result

例子

if __name__ == '__main__':
    import sys
    for entry in ntlistdir(sys.exec_prefix):
        print(entry)

输出:

<DirEntry 'DLLs'>
<DirEntry 'include'>
<DirEntry 'Lib'>
<DirEntry 'libs'>
<DirEntry 'LICENSE.txt'>
<DirEntry 'NEWS.txt'>
<DirEntry 'python.exe'>
<DirEntry 'python.pdb'>
<DirEntry 'python3.dll'>
<DirEntry 'python36.dll'>
<DirEntry 'python36.pdb'>
<DirEntry 'python36_d.dll'>
<DirEntry 'python36_d.pdb'>
<DirEntry 'python3_d.dll'>
<DirEntry 'pythonw.exe'>
<DirEntry 'pythonw.pdb'>
<DirEntry 'pythonw_d.exe'>
<DirEntry 'pythonw_d.pdb'>
<DirEntry 'python_d.exe'>
<DirEntry 'python_d.pdb'>
<DirEntry 'Scripts'>
<DirEntry 'tcl'>
<DirEntry 'Tools'>
<DirEntry 'vcruntime140.dll'>
于 2014-12-12T17:28:16.963 回答