我有一个程序,它有大量传感器以相当高的速率产生数据,以及需要使用它的消费者。消费者以非常不同的价格消费。
由于我使用的是 IObserver/IObservable,因此简单的解决方案是简单地为每个事件创建一个任务,并将 OnNext() 调用和数据包装在 lamda 中。这非常有效,我很惊讶原始调用的开销如此之小。
问题是这些消费者中的一些需要严格执行事件的顺序,并且不能错过任何事件。“PerferFairness”还不够好。
我想出的最佳解决方案不是包装 event/OnNext() 对,而是将 Insert 包装到 ParallelQueue 中,每个消费者一个队列,并在队列的另一端有一个线程来制作 OnNext()来电。
这种方法的三个直接问题。它比 Task/OnNext() 包装解决方案慢得多。ParallelQueue 没有阻塞出列(或者有吗?),所以实现有点棘手。第三,这似乎是一个常见的问题,我无法想象没有某种方法可以强制执行我错过的命令,也许就像多个任务工厂共享一个底层池,每个工厂都有一些设置,使它们严格执行命令。
任何人都知道实现我想要做的事情的正确方法吗?
编辑:任何涉及每个消费者或生产者线程的解决方案都不起作用。生产者/消费者形成长链,每条链有数百个。