1

我有一个允许数千个客户端连接的套接字应用程序。它将它们存储在 a 中ConcurrentDictionary<int, Socket>并且仅针对请求-响应情况运行:

  • 当我需要数据时,我会找到相关的套接字并发送一个请求,请求我需要的数据。
  • 发送请求后,我会收到字节,直到它发送响应。然后我停止接收。

像这样:

public Task<Message> Request(int clientId, Message message)
{
    Socket client;
    return Clients.TryGetValue(clientId, out client)
        ? RequestInternal(client, message);
        : _EmptyTask;
}

public async Task<Message> RequestInternal(Socket client, Message message)
{
    await SendAsync(client, message).ConfigureAwait(false);
    return await ReceiveOneAsync(client).ConfigureAwait(false);
}

现在我需要更改此应用程序以允许客户随时向我发送任何内容;即使没有我提出要求。这 - 我认为 - 将需要不断地从套接字接收和完全不同的方法。

问题:

  1. 这种应用程序的已知方法(最佳实践)是什么?
  2. 您可以告诉我任何问题或您可以指出我的任何指南吗?

我的想法:
免责声明:这部分有点长,完全是假设性的。如果您对上述问题有答案,则可以跳过。

我的想法是:

  • 不断接收字节并将组装好的 PDU 添加到BlockingCollection<Message>.
  • BlockingCollection创建一个线程,专用于使用'方法处理接收到的消息GetConsumingEnumerable

处理线程将执行此操作:

foreach (var message in Messages.GetConsumingEnumerable())
    ProcessMessage(message);

有了这个,我可以接收和处理客户端发送的所有内容,但将发送来回复我的请求的消息与发送的消息区分开来,因为客户端需要这将是一个问题。

我想我可以随请求发送一个唯一标识符字节(该特定客户端唯一)。然后客户端可以在响应中将该标识符发回给我,我可以使用它来区分响应。

ProcessMessage(Message msg)
{
    // msg is a message from msg.Sender.

    if (msg.Id == 0)
    {
        // msg is not a response, do processing.
    }
    else
    {
        // msg is a response to the message that's sent with msg.Id.
        // Find the request that:
        // * ...is made to msg.Sender
        // * ...and has the msg.Id as identifier.
        // And process the response according to that.
    }
}

这意味着我还必须存储请求。这是一个假设的版本RequestInternal编辑: 在 Stephen Cleary 的回答之后用 s替换Wait了调用。await

private async Task RequestInternal(Socket client, Message message)
{
    var request = new Request(client, message);
    Requests.Add(request);

    await SendAsync(client, message).ConfigureAwait(false);
    return await request.Source.Task.ConfigureAwait(false);
}

Request班级:

private sealed class Request
{
    public readonly byte Id;
    public readonly Socket Client;
    public readonly Message Message;
    public readonly TaskCompletionSource<Message> Source;

    public Request(Socket client, Message message)
    {
        Client = client;
        Message = message;
        Source = new TaskCompletionSource<Message>();

        // Obtain a byte unique to that socket...
        Id = GetId(client);
    }
}

ProcessMessage变成这样:

ProcessMessage(Message msg)
{
    if (msg.Id == 0)
        OnReceived(msg); // To raise an event.
    else
    {
        // Method to find a request using msg.Sender and msg.Id
        var request = Requests.Find(msg);

        if (request != null)
            request.Source.SetResult(msg);
    }
}

虽然我不知道是什么样的集合类型Requests

编辑:我使用了ConcurrentDictionary<Key, Request>whereKey是一个私有结构,带有一个Int32(套接字的 ID)和一个Byte(消息的 ID)字段。它还实现了IEquatable<T>.

4

1 回答 1

1

几年前,我写了一篇TCP/IP .NET Sockets FAQ来解决一些常见问题(例如消息帧连续阅读常见错误的解释)。代码示例都使用Socket该类,但相同的概念适用于所有 TCP/IP 套接字。

关于您的协议设计和请求/响应匹配,整体方法听起来不错。您需要确保您是线程安全的(例如,Requests可能是 a ConcurrentDictionary)。此外,您应该await SendAsync而不是调用Wait.

我已经尝试过但尚未投入生产的另一种方法是基于TPL Dataflow。您可以为每个客户端创建一个表示“输出”的块,为“输入”创建另一个块。然后,您可以在其上分层您的消息框架,并在其上分层您的请求/响应匹配,然后将任何剩余的(未经请求的)消息发送到单个 shared BufferBlock

因此,您的“最终用户”API 最终将如下所示:

// Send a request and asynchronously receive a matching response.
Task<Message> RequestAsync(int clientId, Message message);

// Endpoint for unsolicited messages.
IReceivableSourceBlock<Tuple<int, Message>> UnsolicitedMessages { get; }

然后,您可以连接一个ActionBlocktoUnsolicitedMessages来执行一个委托,只要有人进来。

于 2012-12-21T16:49:06.703 回答