0

使用 .net RX 组织数据处理器序列的最佳方法是什么?
- 一个。在 observable 上调用方法,例如 observable.Do(log).Select(transformation).Do(work).Aggregate(someState)...
- b。实现自定义观察者,如果是这样 - 如何链接它们
- c。其他选项.. 还有什么是处理 observable 本身中可能的异常的最佳选择(请参阅上面的问题)并处理 Do、Select 等内部的异常(我知道最佳实践是订阅者不应该抛出)。

此外,我有时需要允许异常作为可观察序列的某些元素返回而不停止序列(请参阅这个问题在不停止序列的情况下处理反应式扩展中的异常

4

1 回答 1

2

这似乎您需要一个工作流程而不是 Rx。似乎基于您的其他问题*您正在尝试采用看起来非常适合 ProducerConsumer 排队工作流程场景的内容,并将其强制放入 Rx。

看起来像你

  • 想要从队列中读取并阻塞,直到收到一个值,然后重新订阅。只需使用 BlockingCollection 的排队功能。当值到达时,它们可以从任何线程推送到集合中。
  • 不希望实际上有一个值序列,而是一个可能在任何给定值上失败的工作流,转移它然后处理下一个值。只需使用队列。处理每个值并将结果放入下一个适当的队列(失败/成功)->参见企业集成模式,特别是无效的消息通道和死信通道。
  • 可能并行处理这些值。请参阅 BlockingCollection.GetConsumingEnumerable 或Disruptor以获得高性能实现。使用这些工具,您可以拥有许多生产者和消费者。当然,你可以用 Rx 做到这一点,但它只是轮询和绑定一个线程。我认为最好明确说明这种事情

我认为您对 Rx 的使用应该为自己付出代价,并且您不应该发现自己与它抗争(就像任何其他技术或框架一样)。

*其他问题,例如 在不停止序列的情况下处理响应式扩展中的异常 以及 如何将 Observable 序列化到云端并返回

于 2012-11-14T18:15:21.867 回答