5

我最近开始在 Windows 上学习 IOCP,并一直在阅读以下文章:

http://www.codeproject.com/Tips/95363/Another-TCP-echo-server-using-IOCP

您可以从以下位置下载文章的示例:

http://dl.dropbox.com/u/281215/documentation/iocp-1.00.html

该示例包含两个简单的应用程序 - iocp_echo_serverTcpEchoClient

我了解 IOCP 通常用于客户端/服务器模型的服务器端,但我想使用 IOCP 创建客户端。

到目前为止,我已经尝试修改上面的客户端示例,以便每当服务器向客户端发送响应时,它都会自动被拾取,但是它不起作用。

我保留了 iocp_echo_server.c 原样。我修改后的 TcpEchoClient.c 版本如下所示:

//TcpEchoClient.c - a minimalistic echo client
// -----------------------------------------------------------------------------

// C language includes
#include <stdio.h>
#include <winsock2.h>
#include "mswsock.h"  // for AcceptEx
#include <stdlib.h> // exit
#include <string.h>

// Windows includes
#include <windows.h>

#pragma warning(disable: 4996) // sprintf

// -----------------------------------------------------------------------------

// configuration
enum
{
    BUFLEN = 1000,
    SERVICE_PORT = 4000,
    SERVER_ADDRESS = INADDR_LOOPBACK
};

enum // socket operations
{
    OP_NONE,
    OP_ACCEPT,
    OP_READ,
    OP_WRITE
};

typedef struct _SocketState // socket state & control
{
    char operation;
    SOCKET socket;
    DWORD length;
    char buf[1024];
} SocketState;

// variables
static HANDLE cpl_port;

static SOCKET sock;
static SocketState sock_state;
static WSAOVERLAPPED sock_ovl;

static LPFN_ACCEPTEX pfAcceptEx;
static GUID GuidAcceptEx = WSAID_ACCEPTEX;

static int msgNumber;
static char msgBuf[BUFLEN];
static struct sockaddr_in sin;

// prototypes
static void createConnection(void);
static void createSocket(void);
static void init(void);
static void initWinsock(void);
static void prepareEndpoint(void);
static void recvBuffer(void);
static void run(void);
static void sendBuffer(void);

static SOCKET create_accepting_socket(void);
static void create_io_completion_port(void);
static BOOL get_completion_status(DWORD*, SocketState**,WSAOVERLAPPED**);

// -----------------------------------------------------------------------------

void main(void)
{
    init();
    run();
}

// -----------------------------------------------------------------------------

static void createConnection(void)
{
    printf("* connecting\n");
    if (WSAConnect(sock, (LPSOCKADDR)&sin, sizeof(sin), NULL, NULL, NULL, NULL) == SOCKET_ERROR)
    {
        int err = WSAGetLastError();
        printf("* error %d in connect\n", err);
        exit(1);
    }
    printf("* connected\n");
}

// -----------------------------------------------------------------------------

static void createSocket(void)
{
    sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (sock == INVALID_SOCKET)
    {
        int err = WSAGetLastError();
        printf("* error %d creating socket\n", err);
        exit(1);
    }

    // for use by AcceptEx
    sock_state.socket = 0; // to be updated later
    sock_state.operation = OP_ACCEPT;

    if (CreateIoCompletionPort((HANDLE)sock, cpl_port, (ULONG_PTR)&sock_state, 0) != cpl_port)
    {
        int err = WSAGetLastError();
        printf("* error %d in listener\n", err);
        exit(1);
    }
}

// -----------------------------------------------------------------------------

static void init(void)
{
    initWinsock();
    create_io_completion_port();
    createSocket();
    prepareEndpoint();
    createConnection();
}

// -----------------------------------------------------------------------------

static void initWinsock(void)
{
    WSADATA wsaData;

    if (WSAStartup(0x202, &wsaData) == SOCKET_ERROR)
    {
        int err = WSAGetLastError();
        printf("* error %d in WSAStartup\n", err);
        exit(1);
    }
}
// -----------------------------------------------------------------------------

static void prepareEndpoint(void)
{
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = htonl(SERVER_ADDRESS);
    sin.sin_port = htons(SERVICE_PORT);

    // bind_listening_socket()
    {
        //if (bind(sock, (SOCKADDR*)&sin, sizeof(sin)) == SOCKET_ERROR)
        //{
        //    printf("* error in bind!\n");
        //    exit(1);
        //}
    }

    // start_listening()
    {
        //if (listen(sock, 100) == SOCKET_ERROR)
        //{
        //    printf("* error in listen!\n");
        //    exit(1);
        //}
        //printf("* started listening for connection requests...\n");
    }

    // load_accept_ex()
    {
        //DWORD dwBytes;

        // black magic for me!!!
        // You do not need to call in your code WSAIoctl. You can directly use AcceptEx and adds Mswsock.lib.
        //WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &pfAcceptEx, sizeof(pfAcceptEx), &dwBytes, NULL, NULL);
    }

    // start_accepting()
    {
        //SOCKET acceptor = create_accepting_socket();
        //DWORD expected = sizeof(struct sockaddr_in) + 16;

        //printf("* started accepting connections...\n");

        // uses listener's completion key and overlapped structure
        //sock_state.socket = acceptor;
        //memset(&sock_ovl, 0, sizeof(WSAOVERLAPPED));

        // starts asynchronous accept
        //if (!pfAcceptEx(sock, acceptor, sock_state.buf, 0 /* no recv */, expected, expected, NULL, &sock_ovl))
        //{
        //    int err = WSAGetLastError();
        //    if (err != ERROR_IO_PENDING)
        //    {
        //        printf("* error %d in AcceptEx\n", err);
        //        exit(1);
        //    }
        //}
    }
}

// -----------------------------------------------------------------------------

static void recvBuffer(void)
{
    char* buf = msgBuf;
    int pendingLen = BUFLEN;

    printf("* receiving reply\n");

    while (pendingLen > 0)
    {
        int partialLen = recv(sock, buf, pendingLen, 0);

        if (partialLen > 0)
        {
            pendingLen -= partialLen;
            buf += partialLen;
            continue;
        }

        // ------

        if (partialLen == 0)
        {
            printf("* connection closed by the server\n");
        }
        else // partialLen < 0
        {
            int err = WSAGetLastError();
            printf("* error %d in recv\n", err);
        }

        exit(1);
    }
}

// -----------------------------------------------------------------------------

static void run(void)
{
    DWORD length;
    BOOL resultOk;
    WSAOVERLAPPED* ovl_res;
    SocketState* socketState;

    for (;;)
    {
        sendBuffer();

        resultOk = get_completion_status(&length, &socketState, &ovl_res);

        recvBuffer();
    }
}

// -----------------------------------------------------------------------------

static void sendBuffer(void)
{
    char* buf = msgBuf;
    int pendingLen = BUFLEN;

    printf("* sending message\n");
    sprintf(msgBuf, "%05 *****", msgNumber++);

    while (pendingLen > 0)
    {
        int partialLen = send(sock, buf, pendingLen, 0);

        if (partialLen > 0)
        {
            pendingLen -= partialLen;
            buf += partialLen;
            continue;
        }

        // -----------

        if (partialLen == 0)
        {
            printf("* connection closed by the server\n");
        }
        else // partialLen < 0
        {
            int err = WSAGetLastError();
            printf("* error %d in send\n", err);
        }

        exit(1);
    }
}

// -----------------------------------------------------------------------------

static SOCKET create_accepting_socket(void)
{
    SOCKET acceptor = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (acceptor == INVALID_SOCKET)
    {
        printf("* error creating accept socket!\n");
        exit(1);
    }
    return acceptor;
}

// -----------------------------------------------------------------------------

static void create_io_completion_port(void)
{
    cpl_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (!cpl_port)
    {
        int err = WSAGetLastError();
        printf("* error %d in line %d CreateIoCompletionPort\n", err, __LINE__);
        exit(1);
    }
}

// -----------------------------------------------------------------------------

static BOOL get_completion_status(DWORD* length, SocketState** socketState, WSAOVERLAPPED** ovl_res)
{
    BOOL resultOk;
    *ovl_res = NULL;
    *socketState = NULL;

    resultOk = GetQueuedCompletionStatus(cpl_port, length, (PULONG_PTR)socketState, ovl_res, INFINITE);

    if (!resultOk)
    {
        DWORD err = GetLastError();
        printf("* error %d getting completion port status!!!\n", err);
    }

    if (!*socketState || !*ovl_res)
    {
        printf("* don't know what to do, aborting!!!\n");
        exit(1);
    }

    return resultOk;
}

// -----------------------------------------------------------------------------
// the end

当服务器通过调用发送响应时:

WSASend(socketState->socket, &wsabuf, 1, NULL, 0, ovl, NULL)

我希望它会在这条线上被客户接收:

resultOk = get_completion_status(&length, &socketState, &ovl_res);

但它没有……</p>

有人能告诉我我做错了什么吗?

编辑:

我采取了以下几点:

  • 在客户端,您使用 WSAConnect() 创建出站连接。
  • 需要时调用 WSARecv() 和 WSASend() 开始读/写操作
  • 如果要使用 I/O 完成端口,则必须使用 WSASend/WSARecv。

并尝试创建一个简单的基于 IOCP 的客户端:

#include <iostream>
#include <winsock2.h>
#pragma comment(lib,"ws2_32.lib")

static DWORD WINAPI ClientWorkerThread(LPVOID lpParameter);

int main(void)
{
    WSADATA WsaDat;
    if (WSAStartup(MAKEWORD(2, 2), &WsaDat) != NO_ERROR)
        return 0;

    // Step 1 - Create an I/O completion port.
    HANDLE hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (!hCompletionPort)
        return 0;

    // Step 2 - Find how many processors.
    SYSTEM_INFO systemInfo;
    GetSystemInfo(&systemInfo);
    const int nNumberOfProcessors = systemInfo.dwNumberOfProcessors;

    // Step 3 - Create worker threads.
    for (int i = 0; i < nNumberOfProcessors; i++)
    {
        HANDLE hThread = CreateThread(NULL, 0, ClientWorkerThread, hCompletionPort, 0, NULL);
        CloseHandle(hThread);
    }

    // Step 4 - Create a socket.
    SOCKET Socket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (Socket == INVALID_SOCKET)
        return 0;

    struct hostent *host;
    if ((host = gethostbyname("localhost")) == NULL)
        return 0;

    SOCKADDR_IN SockAddr;
    SockAddr.sin_family = AF_INET;
    SockAddr.sin_addr.s_addr = *((unsigned long*)host->h_addr);
    SockAddr.sin_port = htons(8888);

    // Step 5 - Associate the socket with the I/O completion port.
    CreateIoCompletionPort((HANDLE)Socket, hCompletionPort, (ULONG_PTR)0, 0);

    if (WSAConnect(Socket, (SOCKADDR*)(&SockAddr), sizeof(SockAddr), NULL, NULL, NULL, NULL) == SOCKET_ERROR)
        return 0;

    char buffer[1000];
    memset(buffer, 0, 999);
    WSABUF wsaBuf = {strlen(buffer), buffer};
    DWORD dwSendBytes = 0;
    DWORD dwReceivedBytes = 0;
    DWORD dwFlags = 0;
    WSAOVERLAPPED wsaOverlapped;
    SecureZeroMemory((PVOID)&wsaOverlapped, sizeof(wsaOverlapped));
    wsaOverlapped.hEvent = WSACreateEvent();

    for(;;)
    {
        WSARecv(Socket, &wsaBuf, 1, &dwReceivedBytes, &dwFlags, &wsaOverlapped, NULL);
        std::cout << wsaBuf.buf;

        //WSASend(Socket, &wsaBuf, 1, &dwSendBytes, 0, &wsaOverlapped, NULL);

        int nError = WSAGetLastError();
        if(nError != WSAEWOULDBLOCK&&nError != 0)
        {
            std::cout << "Winsock error code: " << nError << "\r\n";
            std::cout << "Server disconnected!\r\n";
            shutdown(Socket, SD_SEND);
            closesocket(Socket);

            break;
        }
        Sleep(1000);
    }

    WSACleanup();
    system("PAUSE");
    return 0;
}

static DWORD WINAPI ClientWorkerThread(LPVOID lpParameter)
{
    HANDLE hCompletionPort = (HANDLE)lpParameter;
    DWORD dwBytesTransferred = 0;

    while (TRUE)
    {
        BOOL bRet = GetQueuedCompletionStatus(hCompletionPort, &dwBytesTransferred, (LPDWORD)0, (LPOVERLAPPED*)0, INFINITE);
    }

    return 0;
}

我知道有几件事我做错了,但我不知道它们是什么。

有人可以看看我的代码并给我一些提示吗?

非常感谢

编辑2:

抱歉这篇文章太长了。

在阅读了下面 Remy 的评论后,我又尝试实现基于 IOCP 的客户端,但我仍然不确定我是否走在正确的轨道上。

如果有人可以在下面查看我的新代码(在 VS2010 下编译良好且省略错误检查)并给我一些反馈,我将不胜感激。

非阻塞客户端:

#include <iostream>
#include <winsock2.h>
#pragma comment(lib, "ws2_32.lib")

static DWORD WINAPI ClientWorkerThread(LPVOID lpParameter);

typedef struct _PER_HANDLE_DATA 
{
    SOCKET Socket;
} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;

typedef struct
{
    WSAOVERLAPPED wsaOverlapped;
    WSABUF wsaBuf;
    int OperationType;
} PER_IO_DATA, * LPPER_IO_DATA;

int main(void)
{
    WSADATA WsaDat;
    WSAStartup(MAKEWORD(2, 2), &WsaDat);

    // Step 1 - Create an I/O completion port.
    HANDLE hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

    // Step 2 - Find how many processors.
    SYSTEM_INFO systemInfo;
    GetSystemInfo(&systemInfo);

    // Step 3 - Create worker threads.
    for (int i = 0; i < (int)systemInfo.dwNumberOfProcessors; i++)
    {
        HANDLE hThread = CreateThread(NULL, 0, ClientWorkerThread, hCompletionPort, 0, NULL);
        CloseHandle(hThread);
    }

    // Step 4 - Create a socket.
    SOCKET Socket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);

    PER_HANDLE_DATA *pPerHandleData = new PER_HANDLE_DATA;
    pPerHandleData->Socket = Socket;

    struct hostent *host;
    host = gethostbyname("localhost");

    SOCKADDR_IN SockAddr;
    SockAddr.sin_family = AF_INET;
    SockAddr.sin_addr.s_addr = *((unsigned long*)host->h_addr);
    SockAddr.sin_port = htons(8888);

    // Step 5 - Associate the socket with the I/O completion port.
    CreateIoCompletionPort((HANDLE)Socket, hCompletionPort, (DWORD)pPerHandleData, 0);

    WSAConnect(Socket, (SOCKADDR*)(&SockAddr), sizeof(SockAddr), NULL, NULL, NULL, NULL);

    static char buffer[1000];
    memset(buffer, 0, 999);

    PER_IO_DATA *pPerIoData = new PER_IO_DATA;

    pPerIoData->wsaBuf.buf = buffer;
    pPerIoData->wsaBuf.len = sizeof(buffer);

    DWORD dwSendBytes = 0;
    DWORD dwReceivedBytes = 0;
    DWORD dwFlags = 0;

    SecureZeroMemory((PVOID)&pPerIoData->wsaOverlapped, sizeof(pPerIoData->wsaOverlapped));
    pPerIoData->wsaOverlapped.hEvent = WSACreateEvent();

    WSARecv(Socket, &pPerIoData->wsaBuf, 1, &dwReceivedBytes, &dwFlags, &pPerIoData->wsaOverlapped, NULL);
    std::cout << pPerIoData->wsaBuf.buf;

    for (;;)
    {
        int nError = WSAGetLastError();
        if (nError != WSAEWOULDBLOCK&&nError != 0)
        {
            std::cout << "Winsock error code: " << nError << "\r\n";
            std::cout << "Server disconnected!\r\n";
            shutdown(Socket, SD_SEND);
            closesocket(Socket);

            break;
        }

        Sleep(1000);
    }

    delete pPerHandleData;
    delete pPerIoData;
    WSACleanup();
    return 0;
}

static DWORD WINAPI ClientWorkerThread(LPVOID lpParameter)
{
    HANDLE hCompletionPort = (HANDLE)lpParameter;
    DWORD bytesCopied = 0;
    OVERLAPPED *overlapped = 0;
    LPPER_HANDLE_DATA PerHandleData;
    LPPER_IO_DATA PerIoData;
    DWORD SendBytes, RecvBytes;
    DWORD Flags;
    BOOL bRet;

    while (TRUE)
    {
        bRet = GetQueuedCompletionStatus(hCompletionPort, &bytesCopied, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE);

        if (bytesCopied == 0)
        {
            break;
        }
        else
        {
            Flags = 0;
            ZeroMemory(&(PerIoData->wsaOverlapped), sizeof(WSAOVERLAPPED));

            PerIoData->wsaBuf.len = 1000;
            WSARecv(PerHandleData->Socket, &(PerIoData->wsaBuf), 1, &RecvBytes, &Flags, &(PerIoData->wsaOverlapped), NULL);
        }
    }

    return 0;
}

非阻塞服务器:

#include <iostream>
#include <winsock2.h>
#pragma comment(lib,"ws2_32.lib")

int main()
{
    WSADATA WsaDat;
    WSAStartup(MAKEWORD(2,2), &WsaDat);

    SOCKET listenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);

    SOCKADDR_IN server;
    server.sin_family = AF_INET;
    server.sin_addr.s_addr = INADDR_ANY;
    server.sin_port = htons(8888);

    bind(listenSocket, (SOCKADDR*)(&server), sizeof(server));

    listen(listenSocket, 1);

    SOCKET acceptSocket = SOCKET_ERROR;
    sockaddr_in saClient;
    int nClientSize = sizeof(saClient);
    while (acceptSocket == SOCKET_ERROR)
    {
        std::cout << "Waiting for incoming connections...\r\n";
        acceptSocket = WSAAccept(listenSocket, (SOCKADDR*)&saClient, &nClientSize, NULL, NULL);
    }

    std::cout << "Client connected!\r\n\r\n";

    char *szMessage = "Welcome to the server!\r\n";
    WSAOVERLAPPED SendOverlapped;
    DWORD SendBytes;

    WSABUF DataBuf;
    DataBuf.len = 1000;
    DataBuf.buf = szMessage;

    SecureZeroMemory((PVOID)&SendOverlapped, sizeof(WSAOVERLAPPED));
    SendOverlapped.hEvent = WSACreateEvent();

    for (;;)
    {
        WSASend(acceptSocket, &DataBuf, 1, &SendBytes, 0, &SendOverlapped, NULL);

        int nError = WSAGetLastError();
        if (nError != WSAEWOULDBLOCK && nError != 0)
        {
            std::cout << "Winsock error code: " << nError << "\r\n";
            std::cout << "Client disconnected!\r\n";

            shutdown(acceptSocket, SD_SEND);
            closesocket(acceptSocket);

            break;
        }

        Sleep(1000);
    }

    WSACleanup();
    return 0;
}

再次感谢!

4

2 回答 2

6

尝试这样的事情:

客户:

#include <iostream>
#include <string>
#include <winsock2.h> 
#pragma comment(lib, "ws2_32.lib") 

typedef struct 
{ 
    WSAOVERLAPPED Overlapped; 
    SOCKET Socket; 
    WSABUF wsaBuf; 
    char Buffer[1024];
    DWORD Flags;
} PER_IO_DATA, * LPPER_IO_DATA; 

static DWORD WINAPI ClientWorkerThread(LPVOID lpParameter) 
{ 
    HANDLE hCompletionPort = (HANDLE)lpParameter; 
    DWORD NumBytesRecv = 0; 
    ULONG CompletionKey; 
    LPPER_IO_DATA PerIoData; 

    while (GetQueuedCompletionStatus(hCompletionPort, &NumBytesRecv, &CompletionKey, (LPOVERLAPPED*)&PerIoData, INFINITE))
    {
        if (!PerIoData)
            continue;

        if (NumBytesRecv == 0) 
        {
            std::cout << "Server disconnected!\r\n\r\n";  
        }
        else
        {
            // use PerIoData->Buffer as needed...
            std::cout << std::string(PerIoData->Buffer, NumBytesRecv);

            PerIoData->wsaBuf.len = sizeof(PerIoData->Buffer); 
            PerIoData->Flags = 0; 

            if (WSARecv(PerIoData->Socket, &(PerIoData->wsaBuf), 1, &NumBytesRecv, &(PerIoData->Flags), &(PerIoData->Overlapped), NULL) == 0)
                continue;

            if (WSAGetLastError() == WSA_IO_PENDING)
                continue;
        }

        closesocket(PerIoData->Socket);
        delete PerIoData;
    } 

    return 0; 
} 

int main(void) 
{ 
    WSADATA WsaDat; 
    if (WSAStartup(MAKEWORD(2, 2), &WsaDat) != 0)
        return 0; 

    HANDLE hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 
    if (!hCompletionPort)
        return 0;

    SYSTEM_INFO systemInfo; 
    GetSystemInfo(&systemInfo); 

    for (DWORD i = 0; i < systemInfo.dwNumberOfProcessors; ++i) 
    { 
        HANDLE hThread = CreateThread(NULL, 0, ClientWorkerThread, hCompletionPort, 0, NULL); 
        CloseHandle(hThread); 
    } 

    SOCKET Socket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); 
    if (Socket == INVALID_SOCKET)
        return 0;

    SOCKADDR_IN SockAddr; 
    SockAddr.sin_family = AF_INET; 
    SockAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); 
    SockAddr.sin_port = htons(8888); 

    CreateIoCompletionPort((HANDLE)Socket, hCompletionPort, 0, 0); 

    if (WSAConnect(Socket, (SOCKADDR*)(&SockAddr), sizeof(SockAddr), NULL, NULL, NULL, NULL) == SOCKET_ERROR)
        return 0;

    PER_IO_DATA *pPerIoData = new PER_IO_DATA;
    ZeroMemory(pPerIoData, sizeof(PER_IO_DATA)); 

    pPerIoData->Socket = Socket; 
    pPerIoData->Overlapped.hEvent = WSACreateEvent(); 
    pPerIoData->wsaBuf.buf = pPerIoData->Buffer; 
    pPerIoData->wsaBuf.len = sizeof(pPerIoData->Buffer); 

    DWORD dwNumRecv;
    if (WSARecv(Socket, &(pPerIoData->wsaBuf), 1, &dwNumRecv, &(pPerIoData->Flags), &(pPerIoData->Overlapped), NULL) == SOCKET_ERROR)
    {
        if (WSAGetLastError() != WSA_IO_PENDING)
        {
            delete pPerIoData;
            return 0;
        }
    } 

    while (TRUE) 
        Sleep(1000); 

    shutdown(Socket, SD_BOTH); 
    closesocket(Socket); 

    WSACleanup(); 
    return 0; 
} 

服务器:

#include <iostream>  
#include <winsock2.h>  
#pragma comment(lib,"ws2_32.lib")  

typedef struct
{
    WSAOVERLAPPED Overlapped;
    SOCKET Socket;
    WSABUF wsaBuf;
    char Buffer[1024];
    DWORD BytesSent;
    DWORD BytesToSend;
} PER_IO_DATA, * LPPER_IO_DATA; 


static DWORD WINAPI ServerWorkerThread(LPVOID lpParameter)
{
    HANDLE hCompletionPort = (HANDLE)lpParameter;
    DWORD NumBytesSent = 0;
    ULONG CompletionKey;
    LPPER_IO_DATA PerIoData;

    while (GetQueuedCompletionStatus(hCompletionPort, &NumBytesSent, &CompletionKey, (LPOVERLAPPED*)&PerIoData, INFINITE))    
    {
        if (!PerIoData)
            continue;

        if (NumBytesSent == 0)
        {
            std::cout << "Client disconnected!\r\n\r\n";
        }
        else
        {
            PerIoData->BytesSent += NumBytesSent;
            if (PerIoData->BytesSent < PerIoData->BytesToSend)
            {
                PerIoData->wsaBuf.buf = &(PerIoData->Buffer[PerIoData->BytesSent]);
                PerIoData->wsaBuf.len = (PerIoData->BytesToSend - PerIoData->BytesSent);
            }
            else
            {
                PerIoData->wsaBuf.buf = PerIoData->Buffer;
                PerIoData->wsaBuf.len = strlen(PerIoData->Buffer);
                PerIoData->BytesSent = 0;
                PerIoData->BytesToSend = PerIoData->wsaBuf.len;
            }

            if (WSASend(PerIoData->Socket, &(PerIoData->wsaBuf), 1, &NumBytesSent, 0, &(PerIoData->Overlapped), NULL) == 0)
                continue;

            if (WSAGetLastError() == WSA_IO_PENDING)
                continue;
        }

        closesocket(PerIoData->Socket);
        delete PerIoData;
    }

    return 0;
} 

int main()  
{  
    WSADATA WsaDat;  
    if (WSAStartup(MAKEWORD(2,2), &WsaDat) != 0)
        return 0;  

    HANDLE hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (!hCompletionPort)
        return 0;

    SYSTEM_INFO systemInfo;
    GetSystemInfo(&systemInfo);

    for (DWORD i = 0; i < systemInfo.dwNumberOfProcessors; ++i)
    {
        HANDLE hThread = CreateThread(NULL, 0, ServerWorkerThread, hCompletionPort, 0, NULL);
        CloseHandle(hThread);
    } 

    SOCKET listenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);  
    if (listenSocket == INVALID_SOCKET)
        return 0;  

    SOCKADDR_IN server;
    ZeroMemory(&server, sizeof(server));
    server.sin_family = AF_INET;  
    server.sin_addr.s_addr = INADDR_ANY;  
    server.sin_port = htons(8888);  

    if (bind(listenSocket, (SOCKADDR*)(&server), sizeof(server)) != 0)
        return 0;  

    if (listen(listenSocket, 1) != 0)
        return 0;  

    std::cout << "Waiting for incoming connection...\r\n";  

    SOCKET acceptSocket;  
    do  
    {  
        sockaddr_in saClient;  
        int nClientSize = sizeof(saClient);  
        acceptSocket = WSAAccept(listenSocket, (SOCKADDR*)&saClient, &nClientSize, NULL, NULL);  
    }
    while (acceptSocket == INVALID_SOCKET);

    std::cout << "Client connected!\r\n\r\n";  

    CreateIoCompletionPort((HANDLE)acceptSocket, hCompletionPort, 0, 0); 

    LPPER_IO_DATA pPerIoData = new PER_IO_DATA;
    ZeroMemory(pPerIoData, sizeof(PER_IO_DATA));

    strcpy(pPerIoData->Buffer, "Welcome to the server!\r\n");  

    pPerIoData->Overlapped.hEvent = WSACreateEvent(); 
    pPerIoData->Socket = acceptSocket; 
    pPerIoData->wsaBuf.buf = pPerIoData->Buffer;  
    pPerIoData->wsaBuf.len = strlen(pPerIoData->Buffer);  
    pPerIoData->BytesToSend = pPerIoData->wsaBuf.len;  

    DWORD dwNumSent;
    if (WSASend(acceptSocket, &(pPerIoData->wsaBuf), 1, &dwNumSent, 0, &(pPerIoData->Overlapped), NULL) == SOCKET_ERROR)
    {
        if (WSAGetLastError() != WSA_IO_PENDING)
        {
            delete pPerIoData;
            return 0;
        }
    }  

    while (TRUE)  
        Sleep(1000);  

    shutdown(acceptSocket, SD_BOTH);  
    closesocket(acceptSocket);  

    WSACleanup();  
    return 0;  
}
于 2012-06-19T00:37:06.003 回答
0

您看过 MSDN 文档中的WSARecv示例吗?

从本质上讲,您必须首先启动异步 WSARecv 操作,然后通过完成端口获得其完成的通知。

或者换一种说法:Windows I/O 完成端口使用的是前摄器模型(与 Linux/FreeBSD/NetBSD 的反应器模型相反)。

于 2012-06-12T18:21:05.380 回答