0

我正在尝试使用命名管道编写 IPC。

服务器代码: http: //pastebin.com/tHyAv0e0

客户端代码: http: //pastebin.com/Qd0yGBca

我的问题是关于服务器的。跟随 SO 用户,我正在尝试在服务器代码中使用 BindIoCompletionCallback() 。服务器由以下功能组成:

  • print_last_error :打印最后一个错误的可读消息
  • IocpThreadProc :传递给 BindIoCompletionCallback() 的回调,它调用 ConnectNamedPipe() 和 ReadFile()
  • server_new :创建命名管道并尝试连接到管道的另一端(即客户端),创建退出事件并调用 BindIoCompletionCallback()
  • server_del :应该释放资源
  • 具有无限循环的主函数,等待退出事件发出信号。

当客户端连接时,它会发送消息“salut, c'est le client !”。我已将 ReadFile() 的缓冲区设置为 5,以测试我必须多次调用 ReadFile() 的情况。我有以下输出:

connection pending...
waiting for client...
 ** 0, 0
reading data
 * ReadFile : 0
 ** 0, 5
msg:
reading data
 ** 0, 5
 * ReadFile : 5
reading data
msg: , c'e
 * ReadFile : 5
 ** 0, 5
msg: st le
reading data
 * ReadFile : 5
 ** 0, 5
msg:  clie
reading data
 * ReadFile : 5
 ** 0, 4
msg: nt !~
reading data
IO_PENDING
 ** -1073741493, 0
reading data
unexpected error failed with error 109: Le canal de communication a ÚtÚ fermÚ.
WaitForSingleObject : 0

以 ** 开头的行:它打印回调的参数

以 'msg' 开头的行:它打印由 Readfile 填充的缓冲区的消息

由于客户端发送的消息长度为 24,我通常应该得到这 5 条消息(每条消息都是 5 个字符,除了最后一个,是 4 个字符):

salut
, c'e
st le 
 clie
nt !

但我不能拥有消息的第一部分(即:“salut”)。当 I/O 操作完成时调用回调,可能是第一部分。但是我没有成功调用 ReadFile() 以获取消息的第一部分。我试图在主函数的主循环中、线程中、server_new() 等中调用 ReadFile()……除了正确的方法之外的所有内容。

有人知道如何解决这个问题吗?

谢谢你

4

1 回答 1

2

您的代码包含大量基本错误。更确切地说是所有代码 - 一个完整的错误

查看代码片段(在IocpThreadProcserver_new

    char buf[READ_BUFSIZE];
    ret = ReadFileEx(svr->pipe, buf, sizeof(buf), &svr->ol, IocpThreadProc);

char buf[READ_BUFSIZE]- 这是函数中的局部变量。从函数退出后 - 这将成为堆栈中的任意地址。因此,当读取操作完成时 - 这更快地破坏了您的堆栈或将是未定义的结果。所以这是错误的。您必须不将堆栈内存作为读取缓冲区传递,或者在读取操作完成之前不退出函数

IocpThreadProc作为参数传递给ReadFileEx

lp完成例程

指向完成例程的指针,当读取操作完成并且调用线程处于 警报等待状态时将被调用。

但你永远不会等待警报状态!

以后你用

BindIoCompletionCallback(svr->pipe, IocpThreadProc, 0);

但是将文件绑定到 IOCP 并使用 APC 完成 (lpCompletionRoutine ) 是互斥的。如果说你BindIoCompletionCallback 之前 ReadFileEx(.., IocpThreadProc)打电话- 你会得到错误ERROR_INVALID_PARAMETER

来自 NtReadFile 源代码:

        //
        // If this file has an I/O completion port associated w/it, then
        // ensure that the caller did not supply an APC routine, as the
        // two are mutually exclusive methods for I/O completion
        // notification.
        //

        if (fileObject->CompletionContext && IopApcRoutinePresent( ApcRoutine )) {
            ObDereferenceObject( fileObject );
            return STATUS_INVALID_PARAMETER;
        }

您的代码“工作”只是因为您在call之后绑定了 IOCP ReadFileEx(.., IocpThreadProc)。但是当读取操作完成时会发生什么?APC (for IocpThreadProc) 将被插入到线程和数据包排队到 IOCP。soIocpThreadProc将使用相同的数据调用两次以进行单个操作。它只调用了一次,因为您永远不会在警报状态下等待并且不会从线程中弹出 APC。

您嵌入OVERLAPPED到服务器 - 这是错误。每个异步 I/O 都必须是唯一的。 OVERLAPPED更确切地说,您必须定义自己的类,该类继承自OVERLAPPED. 在这个类中有指向Server的指针,操作代码,可能是一些额外的数据。您需要在每次 I/O 操作之前分配此结构并在完成时释放它。

GetLastError()IocpThreadProc!!!

你需要DWORD dwErrorCode在这里使用,GetLastError()没有意义,因为这里在另一个线程上调用,绝对与操作无关。并且因为这是来自内核的回调,这里NTSTATUS的值确实在 dwErrorCode 中,但不是 win32 错误。例如,在阅读完成时,你可以得到STATUS_PIPE_BROKEN但不是ERROR_BROKEN_PIPE,但这个在 MSDN 文档中已经很大的缺陷

代码示例:

class __declspec(novtable) IoObject
{
    friend struct UIRP;

    LONG _dwRef;

public:

    ULONG AddRef()
    {
        return InterlockedIncrement(&_dwRef);
    }

    ULONG Release()
    {
        ULONG dwRef = InterlockedDecrement(&_dwRef);

        if (!dwRef)
        {
            delete this;
        }

        return dwRef;
    }

protected:

    IoObject()
    {
        _dwRef = 1;
    }

    virtual ~IoObject() 
    {
    };

    virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered) = 0;
};

struct UIRP : OVERLAPPED
{
    IoObject* _obj;
    PVOID _buf;
    ULONG _op;

    UIRP(IoObject* obj, ULONG op, PVOID buf = 0)
    {
        RtlZeroMemory(static_cast<OVERLAPPED*>(this), sizeof(OVERLAPPED));
        _obj = obj;
        obj->AddRef();
        _op = op;
        _buf = buf;
    }

    void CheckError(BOOL f)
    {
        if (!f)
        {
            DWORD dwErrorCode = RtlGetLastNtStatus();

            if (dwErrorCode != STATUS_PENDING)
            {
                OnComplete(dwErrorCode, 0);
            }
        }
    }

    ~UIRP()
    {
        _obj->Release();
    }

    static BOOL BindIoCompletion(HANDLE hObject)
    {
        return BindIoCompletionCallback(hObject, _OnComplete, 0);
    }

private:

    static void WINAPI _OnComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
    {
        static_cast<UIRP*>(lpOverlapped)->OnComplete(dwErrorCode, dwNumberOfBytesTransfered);
    }

    void OnComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered)
    {
        _obj->OnComplete(dwErrorCode, _op, _buf, dwNumberOfBytesTransfered);
        delete this;
    }
};

class __declspec(novtable) CPipe : public IoObject
{
    enum {
        pipe_connect, pipe_read, pipe_write
    };
protected:
    HANDLE _pipe;
    PBYTE _buf;
    ULONG _dataSize;
    ULONG _bufferSize;

public:

    CPipe()
    {
        _pipe = INVALID_HANDLE_VALUE;
        _buf = 0;
        _dataSize = 0;
        _bufferSize = 0;
    }

    BOOL Create(ULONG bufferSize, PCWSTR name);

    BOOL Listen();

    BOOL Write(const void* data, ULONG cb);

    BOOL Disconnect()
    {
        if (IsServer())
        {
            return DisconnectNamedPipe(_pipe);
        }

        HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);

        if (pipe != INVALID_HANDLE_VALUE)
        {
            CloseHandle(pipe);
        }

        return TRUE;
    }

protected:

    BOOL Read();// usually never call direct

    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred) = 0;

    virtual BOOL OnConnect() = 0;   

    virtual void OnDisconnect() = 0;

    virtual BOOL IsServer() = 0;

    virtual void OnWrite(DWORD /*dwErrorCode*/)
    {
    }

    virtual ~CPipe()
    {
        HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);

        if (pipe != INVALID_HANDLE_VALUE)
        {
            CloseHandle(pipe);
        }

        if (_buf)
        {
            delete _buf;
        }
    }

private:

    virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered);
};

void CPipe::OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered)
{
    DbgPrint("%u>%s<%p>(%x, %x, %x)\n", IsServer(), __FUNCTION__, this, dwErrorCode, op, dwNumberOfBytesTransfered);

    switch (op)
    {
    case pipe_read:

        switch(dwErrorCode) 
        {
        case STATUS_SUCCESS:
            if (OnRead(buf, dwNumberOfBytesTransfered)) Read();
            break;

        case STATUS_PIPE_BROKEN:        // pipe handle has been closed, server must call DisconnectNamedPipe
        case STATUS_CANCELLED:          // CancelIo[Ex] called
            Disconnect();

        case STATUS_PIPE_DISCONNECTED:  // server call DisconnectNamedPipe
        case STATUS_INVALID_HANDLE:     // we close handle
            OnDisconnect();
            break;

        default:__debugbreak();
        }
        break;

    case pipe_connect:

        switch(dwErrorCode) 
        {
        case STATUS_SUCCESS:            // ERROR_SUCCESS 
        case STATUS_PIPE_CONNECTED:     // ERROR_PIPE_CONNECTED
        case STATUS_PIPE_CLOSING:       // ERROR_NO_DATA (really client can send data before disconnect, exist sense do read)
            if (OnConnect()) Read();
            break;
        case STATUS_PIPE_BROKEN:        // server call CloseHandle before ConnectNamedPipe complete
        case STATUS_PIPE_DISCONNECTED:  // server call DisconnectNamedPipe before ConnectNamedPipe
        case STATUS_CANCELLED:          // server call CancelIo[Ex]
            break;
        default: __debugbreak();
        }
        break;

    case pipe_write:
        OnWrite(dwErrorCode);
        LocalFree(buf);
        break;

    default: __debugbreak();
    }
}

BOOL CPipe::Create(ULONG bufferSize, PCWSTR name)
{
    if (_buf = new UCHAR[bufferSize])
    {
        _bufferSize = bufferSize;
    }
    else
    {
        return FALSE;
    }

    static WCHAR pipeprefix[] = L"\\\\?\\pipe\\";
    PWSTR path = (PWSTR)alloca(wcslen(name) * sizeof(WCHAR) + sizeof(pipeprefix));
    wcscat(wcscpy(path, pipeprefix), name);

    BOOL bServer = IsServer();

    _pipe = bServer 
        ?
    CreateNamedPipeW(path,
        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
        PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
        PIPE_UNLIMITED_INSTANCES,
        PAGE_SIZE, PAGE_SIZE, INFINITE, NULL)
        :
    CreateFile(path, FILE_READ_ATTRIBUTES|FILE_READ_DATA|
        FILE_WRITE_ATTRIBUTES|FILE_WRITE_DATA, FILE_SHARE_READ|FILE_SHARE_WRITE, 0, OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED, 0);

    if (_pipe == INVALID_HANDLE_VALUE || !UIRP::BindIoCompletion(_pipe))
    {
        return FALSE;
    }

    return bServer ? Listen() : OnComplete(0, pipe_connect, 0, 0), TRUE;
}

BOOL CPipe::Listen()
{
    if (UIRP* irp = new UIRP(this, pipe_connect))
    {
        irp->CheckError(ConnectNamedPipe(_pipe, irp));

        return TRUE;
    }

    return FALSE;
}

BOOL CPipe::Read()
{
    ULONG NumberOfBytesToRead = _bufferSize - _dataSize;

    if (!NumberOfBytesToRead)
    {
        return FALSE;
    }

    PVOID buf = _buf + _dataSize;

    if (UIRP* irp = new UIRP(this, pipe_read, buf))
    {
        irp->CheckError(ReadFile(_pipe, buf, NumberOfBytesToRead, 0, irp));

        return TRUE;
    }

    return FALSE;
}

BOOL CPipe::Write(const void* data, ULONG cb)
{
    if (PVOID buf = LocalAlloc(0, cb))
    {
        if (UIRP* irp = new UIRP(this, pipe_write, buf))
        {
            memcpy(buf, data, cb);

            irp->CheckError(WriteFile(_pipe, buf, cb, 0, irp));

            return TRUE;
        }
    }

    return FALSE;
}

class ServerPipe : public CPipe
{
    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
    {
        DbgPrint("%.*s\n", cbTransferred, buf);

        char sz[256];
        Write(sz, 1 + sprintf(sz, "response from %p server\n", this));

        return TRUE;
    }

    virtual BOOL OnConnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);

        return TRUE;
    }

    virtual void OnDisconnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
        Listen();//
    }

    virtual BOOL IsServer()
    {
        return TRUE;
    }

    virtual ~ServerPipe()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }
};

class ClientPipe : public CPipe
{
    int _n;

    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
    {
        DbgPrint("%.*s\n", cbTransferred, buf);

        if (--_n)
        {
            char sz[256];
            Write(sz, 1 + sprintf(sz, "request[%u] from %p client\n", _n, this));
            return TRUE;
        }
        return FALSE;
    }

    virtual BOOL OnConnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);

        _n = 3;

        char sz[256];
        Write(sz, 1 + sprintf(sz, "hello from %p client\n", this));

        return TRUE;
    }

    virtual void OnDisconnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }

    virtual BOOL IsServer()
    {
        return FALSE;
    }

    virtual ~ClientPipe()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }
};

DWORD CALLBACK ClientThread(void* name)
{
    int n = 2;
    do 
    {
        MessageBox(0,0,L"client",MB_ICONWARNING);
        if (ClientPipe* p = new ClientPipe)
        {
            p->Create(PAGE_SIZE, (PCWSTR)name);
            p->Release();
        }
    } while (--n);

    return 0;
}

void pipeTest()
{
    static WCHAR sname[] = L"__test_pipe__";

    if (HANDLE hThread = CreateThread(0, 0, ClientThread, sname, 0, 0))
    {
        CloseHandle(hThread);
    }

    if (ServerPipe* p = new ServerPipe)
    {
        p->Create(PAGE_SIZE, sname);
        p->Release();
    }

    MessageBox(0,0,0,0);
}

大约DWORD dwErrorCode

VOID CALLBACK FileIOCompletionRoutine(
  __in  DWORD dwErrorCode,
  __in  DWORD dwNumberOfBytesTransfered,
  __in  LPOVERLAPPED lpOverlapped
);

BindIoCompletionCallback文档中存在不明确

返回值

如果函数成功,则返回值非零。

如果函数失败,则返回值为零。要获取扩展错误信息,请调用GetLastError函数。返回的值是一个 NTSTATUS 错误代码。要检索相应的系统错误代码,请使用RtlNtStatusToDosError函数。

The value returned is an NTSTATUS error code 是什么意思?什么返回值?

这是DWORD dwErrorCodeFileIOCompletionRoutine

实际上,我们将内核模式指针传递给(实际上是IO_STATUS_BLOCK前 2 个成员)。当异步操作完成 - 内核填充并将数据包排队到 IOCP(或 APC 到线程)。ntdll从 IOCP 中提取(所以我们得到了指向我们传递给 I/O api 的指针),并填充OVERLAPPEDIO_STATUS_BLOCKIO_STATUS_BLOCKPIO_STATUS_BLOCKOVERLAPPED

dwErrorCode = Iosb->Status, 
dwNumberOfBytesTransfered = (ULONG)Iosb->Information, 
lpOverlapped = (LPOVERLAPPED)Iosb; 

系统不做转换

dwErrorCode = RtlNtStatusToDosError(Iosb->Status)

但直接分配NTSTATUSDWORD dwErrorCode- 所以在FileIOCompletionRoutine我们必须比较dwErrorCode的不是 wi32 错误代码,而是 NTSTATUS 代码(来自"ntstatus.h"

所以我们从来没有 在 FileIOCompletionRoutine 中看到过,但是ERROR_BROKEN_PIPE或者ERROR_PIPE_NOT_CONNECTEDSTATUS_PIPE_BROKENSTATUS_PIPE_DISCONNECTED


并使用新的线程池 API代替代码示例BindIoCompletionCallback。这里的一大优势是 in IoCompletionCallback( PTP_WIN32_IO_CALLBACK) 回调函数ULONG IoResult已经使用了 win32 错误,但不是原始 NTSTATUS (IoResult = RtlNtStatusToDosError(Iosb->Status)并注意ULONG_PTR NumberOfBytesTransferred(与ULONG dwNumberOfBytesTransferedfrom FileIOCompletionRoutine( LPOVERLAPPED_COMPLETION_ROUTINE) 回调函数并将其与ULONG_PTR Informationfrom进行比较IO_STATUS_BLOCK。)

#define StartIo(irp, pio, f) StartThreadpoolIo(_pio); irp->CheckError(f, _pio);

class __declspec(novtable) IoObject
{
    friend struct UIRP;

    LONG _dwRef;

public:

    ULONG AddRef()
    {
        return InterlockedIncrement(&_dwRef);
    }

    ULONG Release()
    {
        ULONG dwRef = InterlockedDecrement(&_dwRef);

        if (!dwRef)
        {
            delete this;
        }

        return dwRef;
    }

protected:

    IoObject()
    {
        _dwRef = 1;
    }

    virtual ~IoObject() 
    {
    };

    virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered) = 0;
};

struct UIRP : OVERLAPPED
{
    IoObject* _obj;
    PVOID _buf;
    ULONG _op;

    UIRP(IoObject* obj, ULONG op, PVOID buf = 0)
    {
        RtlZeroMemory(static_cast<OVERLAPPED*>(this), sizeof(OVERLAPPED));
        _obj = obj;
        obj->AddRef();
        _op = op;
        _buf = buf;
    }

    void CheckError(BOOL f, PTP_IO pio)
    {
        if (!f)
        {
            DWORD dwErrorCode = GetLastError();

            if (dwErrorCode != ERROR_IO_PENDING)
            {
                CancelThreadpoolIo(pio);
                OnComplete(dwErrorCode, 0);
            }
        }
    }

    ~UIRP()
    {
        _obj->Release();
    }

    static PTP_IO BindIoCompletion(HANDLE hObject)
    {
        return CreateThreadpoolIo(hObject, _IoCompletionCallback, 0, 0);
    }

private:

    static VOID CALLBACK _IoCompletionCallback(
        __inout      PTP_CALLBACK_INSTANCE /*Instance*/,
        __inout_opt  PVOID /*Context*/,
        __inout_opt  PVOID Overlapped,
        __in         ULONG IoResult,
        __in         ULONG_PTR NumberOfBytesTransferred,
        __inout      PTP_IO /*Io*/
        )
    {
        static_cast<UIRP*>(Overlapped)->OnComplete(IoResult, (ULONG)NumberOfBytesTransferred);
    }

    void OnComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered)
    {
        _obj->OnComplete(dwErrorCode, _op, _buf, dwNumberOfBytesTransfered);
        delete this;
    }
};

class __declspec(novtable) CPipe : public IoObject
{
    enum {
        pipe_connect, pipe_read, pipe_write
    };
protected:
    HANDLE _pipe;
    PTP_IO _pio;
    PBYTE _buf;
    ULONG _dataSize;
    ULONG _bufferSize;

public:

    CPipe()
    {
        _pipe = INVALID_HANDLE_VALUE;
        _buf = 0;
        _dataSize = 0;
        _bufferSize = 0;
        _pio = 0;
    }

    BOOL Create(ULONG bufferSize, PCWSTR name);

    BOOL Listen();

    BOOL Write(const void* data, ULONG cb);

    BOOL Disconnect()
    {
        if (IsServer())
        {
            return DisconnectNamedPipe(_pipe);
        }

        HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);

        if (pipe != INVALID_HANDLE_VALUE)
        {
            CloseHandle(pipe);
        }

        return TRUE;
    }

protected:

    BOOL Read();// usually never call direct

    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred) = 0;

    virtual BOOL OnConnect() = 0;   

    virtual void OnDisconnect() = 0;

    virtual BOOL IsServer() = 0;

    virtual void OnWrite(DWORD /*dwErrorCode*/)
    {
    }

    virtual ~CPipe()
    {
        if (_pio)
        {
            CloseThreadpoolIo(_pio);
        }

        HANDLE pipe = InterlockedExchangePointer(&_pipe, INVALID_HANDLE_VALUE);

        if (pipe != INVALID_HANDLE_VALUE)
        {
            CloseHandle(pipe);
        }

        if (_buf)
        {
            delete _buf;
        }
    }

private:

    virtual void OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered);
};

void CPipe::OnComplete(DWORD dwErrorCode, ULONG op, PVOID buf, DWORD dwNumberOfBytesTransfered)
{
    DbgPrint("%u>%s<%p>(%x, %x, %x)\n", IsServer(), __FUNCTION__, this, dwErrorCode, op, dwNumberOfBytesTransfered);

    switch (op)
    {
    case pipe_read:

        switch(dwErrorCode) 
        {
        case ERROR_SUCCESS:
            if (OnRead(buf, dwNumberOfBytesTransfered)) Read();
            break;

        case ERROR_BROKEN_PIPE:         // pipe handle has been closed , server must call DisconnectNamedPipe
        case ERROR_OPERATION_ABORTED:   // CancelIo[Ex] called
            Disconnect();

        case ERROR_PIPE_NOT_CONNECTED:  // server call DisconnectNamedPipe
        case ERROR_INVALID_HANDLE:      // we close handle
            OnDisconnect();
            break;

        default:__debugbreak();
        }
        break;

    case pipe_connect:

        switch(dwErrorCode) 
        {
        case ERROR_SUCCESS:             // client just connected 
        case ERROR_PIPE_CONNECTED:      // client already connected
        case ERROR_NO_DATA:             // client already connected and disconnected (really client can send data before disconnect, exist sense do read)
            if (OnConnect()) Read();
            break;
        case ERROR_BROKEN_PIPE:         // server call CloseHandle before ConnectNamedPipe complete
        case ERROR_PIPE_NOT_CONNECTED:  // server call DisconnectNamedPipe before ConnectNamedPipe
        case ERROR_OPERATION_ABORTED:   // server call CancelIo[Ex]
            break;
        default: __debugbreak();
        }
        break;

    case pipe_write:
        OnWrite(dwErrorCode);
        LocalFree(buf);
        break;

    default: __debugbreak();
    }
}

BOOL CPipe::Create(ULONG bufferSize, PCWSTR name)
{
    if (_buf = new UCHAR[bufferSize])
    {
        _bufferSize = bufferSize;
    }
    else
    {
        return FALSE;
    }

    static WCHAR pipeprefix[] = L"\\\\?\\pipe\\";
    PWSTR path = (PWSTR)alloca(wcslen(name) * sizeof(WCHAR) + sizeof(pipeprefix));
    wcscat(wcscpy(path, pipeprefix), name);

    BOOL bServer = IsServer();

    _pipe = bServer 
        ?
    CreateNamedPipeW(path,
        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
        PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
        PIPE_UNLIMITED_INSTANCES,
        PAGE_SIZE, PAGE_SIZE, INFINITE, NULL)
        :
    CreateFile(path, FILE_READ_ATTRIBUTES|FILE_READ_DATA|
        FILE_WRITE_ATTRIBUTES|FILE_WRITE_DATA, FILE_SHARE_READ|FILE_SHARE_WRITE, 0, OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED, 0);

    if (_pipe == INVALID_HANDLE_VALUE || !(_pio = UIRP::BindIoCompletion(_pipe)))
    {
        return FALSE;
    }

    return bServer ? Listen() : OnComplete(0, pipe_connect, 0, 0), TRUE;
}

BOOL CPipe::Listen()
{
    if (UIRP* irp = new UIRP(this, pipe_connect))
    {
        StartIo(irp, _pio, ConnectNamedPipe(_pipe, irp));

        return TRUE;
    }

    return FALSE;
}

BOOL CPipe::Read()
{
    ULONG NumberOfBytesToRead = _bufferSize - _dataSize;

    if (!NumberOfBytesToRead)
    {
        return FALSE;
    }

    PVOID buf = _buf + _dataSize;

    if (UIRP* irp = new UIRP(this, pipe_read, buf))
    {
        StartIo(irp, _pio, ReadFile(_pipe, buf, NumberOfBytesToRead, 0, irp));

        return TRUE;
    }

    return FALSE;
}

BOOL CPipe::Write(const void* data, ULONG cb)
{
    if (PVOID buf = LocalAlloc(0, cb))
    {
        if (UIRP* irp = new UIRP(this, pipe_write, buf))
        {
            memcpy(buf, data, cb);

            StartIo(irp, _pio, WriteFile(_pipe, buf, cb, 0, irp));

            return TRUE;
        }
    }

    return FALSE;
}

class ServerPipe : public CPipe
{
    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
    {
        DbgPrint("%.*s\n", cbTransferred, buf);

        char sz[256];
        Write(sz, 1 + sprintf(sz, "response from %p server\n", this));

        return TRUE;
    }

    virtual BOOL OnConnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);

        return TRUE;
    }

    virtual void OnDisconnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
        Listen();//
    }

    virtual BOOL IsServer()
    {
        return TRUE;
    }

    virtual ~ServerPipe()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }
};

class ClientPipe : public CPipe
{
    int _n;

    virtual BOOL OnRead(PVOID buf, ULONG cbTransferred)
    {
        DbgPrint("%.*s\n", cbTransferred, buf);

        if (--_n)
        {
            char sz[256];
            Write(sz, 1 + sprintf(sz, "request[%u] from %p client\n", _n, this));
            return TRUE;
        }

        return FALSE;
    }

    virtual BOOL OnConnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);

        _n = 3;

        char sz[256];
        Write(sz, 1 + sprintf(sz, "hello from %p client\n", this));

        return TRUE;
    }

    virtual void OnDisconnect()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }

    virtual BOOL IsServer()
    {
        return FALSE;
    }

    virtual ~ClientPipe()
    {
        DbgPrint("%s<%p>\n", __FUNCTION__, this);
    }
};

DWORD CALLBACK ClientThread(void* name)
{
    int n = 2;
    do 
    {
        MessageBox(0,0,L"client",MB_ICONWARNING);
        if (ClientPipe* p = new ClientPipe)
        {
            p->Create(PAGE_SIZE, (PCWSTR)name);
            p->Release();
        }
    } while (--n);

    return 0;
}

void pipeTest()
{
    static WCHAR sname[] = L"__test_pipe__";

    if (HANDLE hThread = CreateThread(0, 0, ClientThread, sname, 0, 0))
    {
        CloseHandle(hThread);
    }

    if (ServerPipe* p = new ServerPipe)
    {
        p->Create(PAGE_SIZE, sname);
        p->Release();
    }

    MessageBox(0,0,0,0);
}
于 2016-12-25T16:34:37.457 回答