目前,我正在使用 RX 框架来实现类似工作流的消息处理管道。本质上,我有一个消息生产者(反序列化网络消息并在主题上调用 OnNext()),并且我有几个消费者。
注意: if 和 transform 是我编写的扩展方法,它们只返回一个 IObservable。
消费者执行以下操作:
var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
.Where(y => y.Value > 5)
.Select(y => y.ComplexObject)
.If(z => z.IsPaid, respond(z))
.Do(z => SendError(z));
commerceRequest
然后被另一个类似的管道消耗,并一直持续到顶部,最后有人调用Subscribe()
最终管道。我遇到的问题是来自基地的消息不会传播,除非直接在某处对消息调用订阅。
如何将消息推送到堆栈顶部?我知道这是一种非正统的方法,但我觉得它使代码非常容易理解消息发生了什么。如果您觉得这是一个非常糟糕的主意,有人可以提出另一种方法吗?