我想实现一个队列,该队列能够从多个线程中的多个生产者那里获取事件/项目,并在单个线程上使用它们。这个队列将在一些关键环境中工作,所以我很关心它的稳定性。
我已经使用 Rx 功能实现了它,但我有两个问题:
- 这个实现好吗?或者也许它在某种我不知道的方面存在缺陷?(作为替代方案 - 使用队列和锁手动实现)
- Dispatcher 的缓冲区长度是多少?它可以处理 10 万个排队的项目吗?
下面的代码使用简单的 TestMethod 说明了我的方法。它的输出显示所有值都是从不同的线程输入的,但在另一个单线程上处理。
[TestMethod()]
public void RxTest()
{
Subject<string> queue = new Subject<string>();
queue
.ObserveOnDispatcher()
.Subscribe(s =>
{
Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", s, Thread.CurrentThread.ManagedThreadId);
},
() => Dispatcher.CurrentDispatcher.InvokeShutdown());
for (int j = 0; j < 10; j++)
{
ThreadPool.QueueUserWorkItem(o =>
{
for (int i = 0; i < 100; i++)
{
Thread.Sleep(10);
queue.OnNext(string.Format("value: {0}, from thread: {1}", i.ToString(), Thread.CurrentThread.ManagedThreadId));
}
queue.OnCompleted();
});
}
Dispatcher.Run();
}