1

我有一个发布通用消息的服务,并且我为这些消息创建了一个 observable。这些消息可以包含任何内容,并且可以在顶部分层不同的协议。

我希望添加第二层 observable 来解释这些消息中的特定协议。例如,消息可以是“更新”、“错误”或“完成”类型。我希望重新发布“更新”消息,在“错误”上抛出错误,并在“完成”上完成序列。

我怎样才能干净地做到这一点?

我不认为我可以用它SelectMany来做到这一点;虽然选择器可以返回Observable.Return(),或者Observable.Throw()对于前两种情况,我无法从选择器中完成(调用observer.OnCompleted()和取消订阅底层的 observable)。

在我看来,我必须使用Observable.Create()并订阅 subscribe 方法中的底层 observable。我已经这样做了,但实现对我来说感觉很奇怪,因为它没有使用 Rx 中更常见的函数式组合样式。这是正确的方法吗?

public IObservable<Message> InterpretProtocol(IObservable<message> stream)
{
  return Observable.Create<Message>(observer =>
  {
    return stream.Subscribe(message =>
    {
      switch (ProtocolMessageTypeOf(message))
      {
        case ProtocolMessageType.Error:
          observer.OnError(new InvalidOperationException(message));
          break;

        case ProtocolMessageType.Complete:
          observer.OnCompleted();
          break;

        default:
          observer.OnNext(message);
          break;
      }
    });
  });
}
4

1 回答 1

2

您可以尝试以下方法:

public IObservable<Message> InterpretProtocol(IObservable<message> stream) {

  return stream.
         TakeWhile(msg => ProtocolMessageTypeOf(message) != ProtocolMessageType.Complete).
         Select(msg => {
             if(ProtocolMessageTypeOf(message) == ProtocolMessageType.Error)
               throw new InvalidOperationException(message);
             else
               return msg;
        });

}
于 2013-07-03T10:52:27.897 回答