我有一个允许数千个客户端连接的套接字应用程序。它将它们存储在 a 中ConcurrentDictionary<int, Socket>
并且仅针对请求-响应情况运行:
- 当我需要数据时,我会找到相关的套接字并发送一个请求,请求我需要的数据。
- 发送请求后,我会收到字节,直到它发送响应。然后我停止接收。
像这样:
public Task<Message> Request(int clientId, Message message)
{
Socket client;
return Clients.TryGetValue(clientId, out client)
? RequestInternal(client, message);
: _EmptyTask;
}
public async Task<Message> RequestInternal(Socket client, Message message)
{
await SendAsync(client, message).ConfigureAwait(false);
return await ReceiveOneAsync(client).ConfigureAwait(false);
}
现在我需要更改此应用程序以允许客户随时向我发送任何内容;即使没有我提出要求。这 - 我认为 - 将需要不断地从套接字接收和完全不同的方法。
问题:
- 这种应用程序的已知方法(最佳实践)是什么?
- 您可以告诉我任何问题或您可以指出我的任何指南吗?
我的想法:
免责声明:这部分有点长,完全是假设性的。如果您对上述问题有答案,则可以跳过。
我的想法是:
- 不断接收字节并将组装好的 PDU 添加到
BlockingCollection<Message>
. BlockingCollection
创建一个线程,专用于使用'方法处理接收到的消息GetConsumingEnumerable
。
处理线程将执行此操作:
foreach (var message in Messages.GetConsumingEnumerable())
ProcessMessage(message);
有了这个,我可以接收和处理客户端发送的所有内容,但将发送来回复我的请求的消息与发送的消息区分开来,因为客户端需要这将是一个问题。
我想我可以随请求发送一个唯一标识符字节(该特定客户端唯一)。然后客户端可以在响应中将该标识符发回给我,我可以使用它来区分响应。
ProcessMessage(Message msg)
{
// msg is a message from msg.Sender.
if (msg.Id == 0)
{
// msg is not a response, do processing.
}
else
{
// msg is a response to the message that's sent with msg.Id.
// Find the request that:
// * ...is made to msg.Sender
// * ...and has the msg.Id as identifier.
// And process the response according to that.
}
}
这意味着我还必须存储请求。这是一个假设的版本RequestInternal
:
编辑: 在 Stephen Cleary 的回答之后用 s替换Wait
了调用。await
private async Task RequestInternal(Socket client, Message message)
{
var request = new Request(client, message);
Requests.Add(request);
await SendAsync(client, message).ConfigureAwait(false);
return await request.Source.Task.ConfigureAwait(false);
}
和Request
班级:
private sealed class Request
{
public readonly byte Id;
public readonly Socket Client;
public readonly Message Message;
public readonly TaskCompletionSource<Message> Source;
public Request(Socket client, Message message)
{
Client = client;
Message = message;
Source = new TaskCompletionSource<Message>();
// Obtain a byte unique to that socket...
Id = GetId(client);
}
}
并ProcessMessage
变成这样:
ProcessMessage(Message msg)
{
if (msg.Id == 0)
OnReceived(msg); // To raise an event.
else
{
// Method to find a request using msg.Sender and msg.Id
var request = Requests.Find(msg);
if (request != null)
request.Source.SetResult(msg);
}
}
虽然我不知道是什么样的集合类型Requests
。
编辑:我使用了ConcurrentDictionary<Key, Request>
whereKey
是一个私有结构,带有一个Int32
(套接字的 ID)和一个Byte
(消息的 ID)字段。它还实现了IEquatable<T>
.