3

我有一个程序,它有大量传感器以相当高的速率产生数据,以及需要使用它的消费者。消费者以非常不同的价格消费。

由于我使用的是 IObserver/IObservable,因此简单的解决方案是简单地为每个事件创建一个任务,并将 OnNext() 调用和数据包装在 lamda 中。这非常有效,我很惊讶原始调用的开销如此之小。

问题是这些消费者中的一些需要严格执行事件的顺序,并且不能错过任何事件。“PerferFairness”还不够好。

我想出的最佳解决方案不是包装 event/OnNext() 对,而是将 Insert 包装到 ParallelQueue 中,每个消费者一个队列,并在队列的另一端有一个线程来制作 OnNext()来电。

这种方法的三个直接问题。它比 Task/OnNext() 包装解决方案慢得多。ParallelQueue 没有阻塞出列(或者有吗?),所以实现有点棘手。第三,这似乎是一个常见的问题,我无法想象没有某种方法可以强制执行我错过的命令,也许就像多个任务工厂共享一个底层池,每个工厂都有一些设置,使它们严格执行命令。

任何人都知道实现我想要做的事情的正确方法吗?

编辑:任何涉及每个消费者或生产者线程的解决方案都不起作用。生产者/消费者形成长链,每条链有数百个。

4

2 回答 2

4

TPL DataFlow 库可能非常适合您的应用程序。它使用数据流范例扩展了 TPL,允许您配置处理图并在高性能执行环境中运行。

TPL 数据流可作为 .NET 4 之上的库使用,并且应作为 .NET 4.5 的一部分提供。

于 2011-11-07T13:13:58.490 回答
1

没有评论强制执行部分排序的最佳抽象,但如果你在 aBlockingCollection<T>周围使用包装器ConcurrentQueue<T>,这将为你提供一个阻止Take元素出列的操作。例如:

// the default is ConcurrentQueue, so you don't have to specify, but if you
// wanted different behavior you could use e.g. ConcurrentStack

var coll = new BlockingCollection<int>(new ConcurrentQueue<int>());

coll.Add(5); // blocks if the collection is at max capacity

int five = coll.Take(); // blocks if the collection is empty
于 2010-10-29T19:48:36.707 回答