我有一个发布通用消息的服务,并且我为这些消息创建了一个 observable。这些消息可以包含任何内容,并且可以在顶部分层不同的协议。
我希望添加第二层 observable 来解释这些消息中的特定协议。例如,消息可以是“更新”、“错误”或“完成”类型。我希望重新发布“更新”消息,在“错误”上抛出错误,并在“完成”上完成序列。
我怎样才能干净地做到这一点?
我不认为我可以用它SelectMany
来做到这一点;虽然选择器可以返回Observable.Return()
,或者Observable.Throw()
对于前两种情况,我无法从选择器中完成(调用observer.OnCompleted()
和取消订阅底层的 observable)。
在我看来,我必须使用Observable.Create()
并订阅 subscribe 方法中的底层 observable。我已经这样做了,但实现对我来说感觉很奇怪,因为它没有使用 Rx 中更常见的函数式组合样式。这是正确的方法吗?
public IObservable<Message> InterpretProtocol(IObservable<message> stream)
{
return Observable.Create<Message>(observer =>
{
return stream.Subscribe(message =>
{
switch (ProtocolMessageTypeOf(message))
{
case ProtocolMessageType.Error:
observer.OnError(new InvalidOperationException(message));
break;
case ProtocolMessageType.Complete:
observer.OnCompleted();
break;
default:
observer.OnNext(message);
break;
}
});
});
}