我有一个简单的消息传递框架,它围绕附加到多个“客户端”实例的“主机”的单个实例构建。每个实例通过向其推送消息来与另一个实例通信。然后,实例使用单独的任务按照接收消息的顺序处理它们自己的消息。
这是代码的简化版本:
interface IMessage
{
}
class Host
{
ConcurrentBag<Client> _Clients = new ConcurrentBag<Client>();
BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();
public Host()
{
Task.Factory.StartNew(() =>
{
foreach (var message in _IncomingMessages.GetConsumingEnumerable())
{
ProcessIncomingMessage(message);
}
});
}
public void AddClient(Client client)
{
_Clients.Add(client);
}
private void ProcessIncomingMessage(IMessage message)
{
// consume the message and typically generate new messages
// For now just echo the message back
Broadcast(message);
}
private void Broadcast(IMessage message)
{
foreach (var client in _Clients)
{
client.PushMessage(message);
}
}
}
class Client
{
private Host _Host;
private BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();
public Client(Host host)
{
Task.Factory.StartNew(() =>
{
foreach (var message in _IncomingMessages.GetConsumingEnumerable())
{
ProcessIncomingMessage(message);
}
});
_Host = host;
_Host.AddClient(this);
}
public void PushMessage(IMessage message)
{
_IncomingMessages.Add(message);
}
private void ProcessIncomingMessage(IMessage message)
{
// interpret the message and update state
}
}
我的要求是每个处理它们的消息的实例都使用并行执行(如果可能的话),并且每条消息都按照每个班级一次收到一条的顺序进行处理。
生成仅从 GetConsumingEnumerable() 中提取的新任务是不是一个坏主意?
我觉得我可以使用响应式扩展和 IObserveable 来简化这一点,但我对使用 Rx 还是很陌生,并且不确定如何构建它来满足我的并行执行需求。
任何指导将不胜感激!