1

我在使用“GetExtendedTcpTable”时遇到了一些麻烦。当我尝试运行我的脚本时,我收到了这样的消息:

AssertionError: [Error 0] 操作成功完成

很少脚本正常工作,我不明白这个消息,操作完成,怎么了?

这是代码,我试图执行:

from ctypes import *
from ctypes.wintypes import *
from socket import  inet_aton,   inet_ntoa,  htons


AF_INET = 2
TCP_TABLE_BASIC_LISTENER = 0
TCP_TABLE_BASIC_CONNECTIONS = 1
TCP_TABLE_BASIC_ALL = 2
TCP_TABLE_OWNER_PID_LISTENER = 3
TCP_TABLE_OWNER_PID_CONNECTIONS = 4
TCP_TABLE_OWNER_PID_ALL = 5
TCP_TABLE_OWNER_MODULE_LISTENER = 6
TCP_TABLE_OWNER_MODULE_CONNECTIONS = 7
TCP_TABLE_OWNER_MODULE_ALL = 8

# for storing socket info python style.  
class socket_info:

    State = None
    LocalAddr = None
    LocalPort = None
    RemoteAddr = None
    RemotePort = None

    def __init__ (self, **kwargs):

        for key, word in kwargs.items():
            setattr(self, key, word)

def formatip (ip):
    ip = inet_aton (str(ip))
    return inet_ntoa (ip[::-1])

states = {
    1 : "TCP_STATE_CLOSED",
    2 : "TCP_STATE_LISTEN",
    3 : "TCP_STATE_SYN_SENT",
    4 : "TCP_STATE_SYN_RCVD",
    5 : "TCP_STATE_ESTAB",
    6 : "TCP_STATE_FIN_WAIT",
    7 : "TCP_STATE_FIN_WAIT2",
    8 : "TCP_STATE_CLOSE_WAIT",
    9 : "TCP_STATE_CLOSING",
    10 : "TCP_STATE_LAST_ACK",
    11 : "TCP_STATE_TIME_WAIT",
    12 : "TCP_STATE_DELETE_TCB",

    "TCP_STATE_CLOSED" : 1,
    "TCP_STATE_LISTEN" : 2,
    "TCP_STATE_SYN_SENT" : 3,
    "TCP_STATE_SYN_RCVD" : 4,
    "TCP_STATE_ESTAB" : 5,
    "TCP_STATE_FIN_WAIT" : 6,
    "TCP_STATE_FIN_WAIT2" : 7,
    "TCP_STATE_CLOSE_WAIT" : 8,
    "TCP_STATE_CLOSING" : 9,
    "TCP_STATE_LAST_ACK" :10,
    "TCP_STATE_TIME_WAIT" : 11,
    "TCP_STATE_DELETE_TCB" : 12 }

class MIB_TCPROW_OWNER_PID(Structure):
    _fields_ = [
        ("dwState", DWORD),
        ("dwLocalAddr", DWORD),
        ("dwLocalPort", DWORD),
        ("dwRemoteAddr", DWORD),
        ("dwRemotePort", DWORD),
        ("dwOwningPid", DWORD)
        ]


class MIB_TCPTABLE_OWNER_PID(Structure):
    _fields_ = [
        ("dwNumEntries", DWORD),
        ("MIB_TCPROW_OWNER_PID", MIB_TCPROW_OWNER_PID * 100)
        ]



def GetExtendedTcpTable (vip=AF_INET):
    table = MIB_TCPTABLE_OWNER_PID ()
    so = sizeof (table)
    size = DWORD (so)
    order = c_int(1)

    failure= windll.iphlpapi.GetExtendedTcpTable (
        byref (table),
        addressof (size),
        order,
        vip,
        TCP_TABLE_OWNER_PID_ALL,
        0    )

    assert not failure,  WinError (GetLastError ())

    pytables = []
    tables = table.MIB_TCPROW_OWNER_PID

    for index in range(table.dwNumEntries):
        table = tables [index]
        pytables.append (
            socket_info (
                State=states.get (table.dwState, "UNKNOWN_STATE_%s" %(str(table.dwState))),
                LocalAddr=formatip (table.dwLocalAddr),
                LocalPort=htons(table.dwLocalPort),
                RemoteAddr=formatip (table.dwRemoteAddr),
                RemotePort=htons(table.dwRemotePort),
                OwningPid = int (table.dwOwningPid)
            )
        )
    return pytables


def GetTcpTableForPid (pid):
    tables = GetExtendedTcpTable ()
    for table in tables:
        if table.OwningPid == pid: return table
    raise "Cannot find tcp table for pid %s" %pid

dict_process = {}
pid_set =set()
pid_list = []
tcp_info_list = []
tcp_info = GetExtendedTcpTable()
for item in tcp_info:
    LocalAddr = item.LocalAddr
    LocalPort = item.LocalPort
    RemoteAddr = item.RemoteAddr
    RemotePort = item.RemotePort
    OwningPid = item.OwningPid
    print('local Addr: '+ LocalAddr,'local port: '+ str(LocalPort),'remote Addr: ' + RemoteAddr, 'Remote Port: ' + str(RemotePort), OwningPid)

该脚本不时运行。它可以运行 5 分钟,然后由于这个愚蠢的错误而无法运行大约一个小时。如何绕过它?

我真的不知道,它是怎么回事。请帮帮我,我做错了什么?

我在 Win7 SP1 x64 上使用 python 3.2

十分感谢!

4

1 回答 1

2

你不应该使用addressof(size). 这将返回一个 Python 整数,该整数将被转换为 32 位 C int。用于byref(size)创建一个指针,如果您使用的是 64 位 Python,它将是一个 64 位值。

GetExtendedTcpTable不叫SetLastError。它返回DWORD带有以下代码之一的 a:

NO_ERROR = 0
ERROR_INVALID_PARAMETER = 87
ERROR_INSUFFICIENT_BUFFER = 122

pdwSize如果缓冲区太小,则参数具有所需的大小。这里的一种选择是从长度为 0 的数组开始;然后resize是结构;最后cast将数组调整为正确的大小:

class MIB_TCPTABLE_OWNER_PID(Structure):
    _fields_ = [
        ("dwNumEntries", DWORD),
        ("MIB_TCPROW_OWNER_PID", MIB_TCPROW_OWNER_PID * 0),
    ]

_GetExtendedTcpTable = windll.iphlpapi.GetExtendedTcpTable

def GetExtendedTcpTable(vip=AF_INET):
    table = MIB_TCPTABLE_OWNER_PID()
    size = DWORD() 
    order = 1

    failure = _GetExtendedTcpTable(
                  byref(table),
                  byref(size),
                  order,
                  vip,
                  TCP_TABLE_OWNER_PID_ALL,
                  0)

    if failure == ERROR_INSUFFICIENT_BUFFER:
        resize(table, size.value)
        memset(byref(table), 0, sizeof(table))
        failure = _GetExtendedTcpTable(
                      byref(table),
                      byref(size),
                      order,
                      vip,
                      TCP_TABLE_OWNER_PID_ALL,
                      0)

    if failure: 
        raise WinError(failure)

    ptr_type = POINTER(MIB_TCPROW_OWNER_PID * table.dwNumEntries)
    tables = cast(table.MIB_TCPROW_OWNER_PID, ptr_type)[0]

    pytables = []
    for table in tables:
        # rest unchanged

关于 Win32 的值,一般在 Python 中LastError不应该依赖。GetLastError您不知道您是否看到来自先前调用的旧错误代码,或者干预调用是否修改了该LastError值。如果您正在检查使用 的单个 API 调用,那么在调用失败后立即LastError检查应该没问题。GetLastError但更一般地,您可能需要使用以下命令加载 DLL use_last_error=True

iphlpapi = WinDLL('iphlpapi', use_last_error=True)

从此WinDLL实例创建的函数指针将LastError在调用返回后立即保存到线程本地存储中。调用get_last_error返回保存的错误代码。在调用函数之前,您可以事先调用set_last_error(0)以将 0 换入。LastError

于 2013-08-05T21:50:55.717 回答