9

问题

设计一个高效且非常快速的命名管道客户端服务器框架。

当前状态

我已经拥有经过实战验证的生产测试框架。它很快,但是每个管道连接使用一个线程,如果有很多客户端,线程数可能很快就会很高。我已经使用了可以根据需要扩展的智能线程池(实际上是任务池)。

我已经对管道使用了 OVERLAPED 模式,但后来我用 WaitForSingleObject 或 WaitForMultipleObjects 阻塞,这就是为什么我需要在服务器端的每个连接一个线程

所需的解决方案:

客户端很好,但在服务器端,我只想为每个客户端请求而不是每个连接使用一个线程。因此,我不会在客户端的整个生命周期(连接/断开连接)中使用一个线程,而是每个任务使用一个线程。因此,仅当客户端请求数据时,仅此而已。

我在 MSDN 上看到一个示例,它使用 OVERLAPED 结构数组,然后使用 WaitForMultipleObjects 等待它们。我觉得这是一个糟糕的设计。我在这里看到两个问题。首先,您必须维护一个可以变得非常大的数组,并且删除将是昂贵的。其次,您有很多事件,每个数组成员都有一个事件。

我还看到了完成端口,例如CreateIoCompletionPortGetQueuedCompletionStatus,但我看不出它们有什么更好的地方。

我想要的是ReadFileExWriteFileEx做的事情,它们在操作完成时调用一个回调例程。这是一种真正的异步编程风格。但问题是 ConnectNamedPipe 不支持它,而且我看到线程需要处于警报状态,您需要调用一些 *Ex 函数来获得它。

那么如何最好地解决这样的问题呢?

以下是 MSDN 的做法:http: //msdn.microsoft.com/en-us/library/windows/desktop/aa365603 (v=vs.85).aspx

我用这种方法看到的问题是,如果WaitForMultipleObjects的限制是 64 个句柄,我看不到如何同时连接 100 个客户端。当然我可以在每次请求后断开管道,但想法是像在 TCP 服务器中一样拥有一个永久的客户端连接,并在整个生命周期中跟踪客户端,每个客户端都有唯一的 ID 和客户端特定的数据。

理想的伪代码应该是这样的:

repeat
  // wait for the connection or for one client to send data
  Result = ConnectNamedPipe or ReadFile or Disconnect; 

  case Result of
    CONNECTED: CreateNewClient; // we create a new client
    DATA: AssignWorkerThread; // here we process client request in a thread
    DISCONNECT: CleanupAndDeleteClient // release the client object and data
  end;
until Aborted;

这样我们就只有一个监听线程来接受连接/断开/onData 事件。线程池(工作线程)只处理实际请求。这样 5 个工作线程可以为许多连接的客户端提供服务。

PS我当前的代码应该不重要。我在 Delphi 中对此进行了编码,但它是纯 WinAPI,因此语言无关紧要。

编辑:

现在 IOCP 看起来像解决方案:

I/O 完成端口为在多处理器系统上处理多个异步 I/O 请求提供了一个高效的线程模型。当一个进程创建一个 I/O 完成端口时,系统会为请求创建一个关联的队列对象,这些请求的唯一目的是为这些请求提供服务。处理许多并发异步 I/O 请求的进程可以通过将 I/O 完成端口与预先分配的线程池结合使用,而不是通过在接收 I/O 请求时创建线程来更快、更有效地完成此操作。

4

2 回答 2

3

如果服务器必须处理超过 64 个事件(读/写),那么任何使用 WaitForMultipleObjects 的解决方案都变得不可行。这就是微软向 Windows 引入 IO 完成端口的原因。它可以使用最合适的线程数(通常是处理器/内核的数量)来处理非常多的 IO 操作。

IOCP的问题是很难正确实施。隐藏的问题像地雷一样在现场传播:[ 1 ]、[ 2 ](第 3.6 节)。我建议使用一些框架。小谷歌搜索建议为 Delphi 开发人员提供称为Indy的东西。也许还有其他人。

在这一点上,如果这意味着编写我自己的 IOCP 实现,我会忽略对命名管道的要求。这不值得悲伤。

于 2013-07-23T11:26:48.620 回答
1

我认为您忽略的是在任何给定时间您只需要几个侦听命名管道实例。连接管道实例后,您可以关闭该实例并创建一个新的侦听实例来替换它。

使用MAXIMUM_WAIT_OBJECTS(或更少)侦听命名管道实例,您可以拥有一个专用于侦听的线程WaitForMultipleObjectsExReadFileEx同一个线程还可以使用andWriteFileEx和 APC处理其余的 I/O 。工作线程会将 APC 排队到 I/O 线程以启动 I/O,并且 I/O 线程可以使用任务池返回结果(以及让工作线程知道新连接)。

I/O 线程主函数看起来像这样:

create_events();
for (index = 0; index < MAXIMUM_WAIT_OBJECTS; index++) new_pipe_instance(i);

for (;;)
{
    if (service_stopping && active_instances == 0) break;

    result = WaitForMultipleObjectsEx(MAXIMUM_WAIT_OBJECTS, connect_events, 
                    FALSE, INFINITE, TRUE);

    if (result == WAIT_IO_COMPLETION) 
    {
        continue;
    }
    else if (result >= WAIT_OBJECT_0 && 
                     result < WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS) 
    {
        index = result - WAIT_OBJECT_0;
        ResetEvent(connect_events[index]);

        if (GetOverlappedResult(
                connect_handles[index], &connect_overlapped[index], 
                &byte_count, FALSE))
            {
                err = ERROR_SUCCESS;
            }
            else
            {
                err = GetLastError();
            }

        connect_pipe_completion(index, err);
        continue;
    }
    else
    {
        fail();
    }
}

唯一真正的复杂性是,当您调用ConnectNamedPipe它时,它可能会返回ERROR_PIPE_CONNECTED指示调用立即成功或错误,而不是ERROR_IO_PENDING调用立即失败。在这种情况下,您需要重置事件然后处理连接:

void new_pipe(ULONG_PTR dwParam)
{
    DWORD index = dwParam;

    connect_handles[index] = CreateNamedPipe(
        pipe_name, 
        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
        PIPE_TYPE_MESSAGE | PIPE_WAIT | PIPE_ACCEPT_REMOTE_CLIENTS,
        MAX_INSTANCES,
        512,
        512,
        0,
        NULL);

    if (connect_handles[index] == INVALID_HANDLE_VALUE) fail();

    ZeroMemory(&connect_overlapped[index], sizeof(OVERLAPPED));
    connect_overlapped[index].hEvent = connect_events[index];

    if (ConnectNamedPipe(connect_handles[index], &connect_overlapped[index])) 
    {
        err = ERROR_SUCCESS;
    }
    else
    {
        err = GetLastError();

        if (err == ERROR_SUCCESS) err = ERROR_INVALID_FUNCTION;

        if (err == ERROR_PIPE_CONNECTED) err = ERROR_SUCCESS;
    }

    if (err != ERROR_IO_PENDING) 
    {
        ResetEvent(connect_events[index]);
        connect_pipe_completion(index, err);
    }
}

connect_pipe_completion函数将在任务池中创建一个新任务来处理新连接的管道实例,然后将一个 APC 排队调用new_pipe以在同一索引处创建一个新的监听管道。

一旦它们关闭,就可以重用现有的管道实例,但在这种情况下,我认为这不值得麻烦。

于 2013-07-24T04:46:30.543 回答