我最近一直在使用响应式框架做一些工作,到目前为止我一直非常喜欢它。我正在考虑用一些过滤的 IObservable 替换传统的轮询消息队列,以清理我的服务器操作。以旧的方式,我处理进入服务器的消息是这样的:
// Start spinning the process message loop
Task.Factory.StartNew(() =>
{
while (true)
{
Command command = m_CommandQueue.Take();
ProcessMessage(command);
}
}, TaskCreationOptions.LongRunning);
这导致连续轮询线程将来自客户端的命令委托给 ProcessMessage 方法,在该方法中我有一系列 if/else-if 语句,这些语句确定命令的类型并根据其类型委托工作
我将使用 Reactive 的事件驱动系统替换它,为此我编写了以下代码:
private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>();
private IObservable<BesiegedMessage> m_MessagePublisher;
m_MessagePublisher = m_MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default);
// All generic Server messages (containing no properties) will be processed here
IDisposable genericServerMessageSubscriber = m_MessagePublisher
.Where(message => message is GenericServerMessage)
.Subscribe(message =>
{
// do something with the generic server message here
}
我的问题是,虽然这可行,但使用阻塞集合作为这样的 IObservable 的支持是一种好习惯吗?我没有看到 Take() 曾经以这种方式调用过,这让我认为消息会堆积在队列中而不会在处理后被删除?
将主题作为后备集合来驱动将接收这些消息的过滤后的 IObservable 会更有效吗?还有什么我在这里遗漏的可能有利于这个系统的架构的东西吗?