3

我想实现一个队列,该队列能够从多个线程中的多个生产者那里获取事件/项目,并在单个线程上使用它们。这个队列将在一些关键环境中工作,所以我很关心它的稳定性。

我已经使用 Rx 功能实现了它,但我有两个问题:

  1. 这个实现好吗?或者也许它在某种我不知道的方面存在缺陷?(作为替代方案 - 使用队列和锁手动实现)
  2. Dispatcher 的缓冲区长度是多少?它可以处理 10 万个排队的项目吗?

下面的代码使用简单的 TestMethod 说明了我的方法。它的输出显示所有值都是从不同的线程输入的,但在另一个单线程上处理。

[TestMethod()]
public void RxTest()
{
    Subject<string> queue = new Subject<string>();

    queue
        .ObserveOnDispatcher()
        .Subscribe(s =>
                        {
                            Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", s, Thread.CurrentThread.ManagedThreadId);
                        },
                    () => Dispatcher.CurrentDispatcher.InvokeShutdown());

    for (int j = 0; j < 10; j++)
    {
        ThreadPool.QueueUserWorkItem(o =>
        {
            for (int i = 0; i < 100; i++)
            {
                Thread.Sleep(10);
                queue.OnNext(string.Format("value: {0}, from thread: {1}", i.ToString(), Thread.CurrentThread.ManagedThreadId));
            }
            queue.OnCompleted();
        });
    }


    Dispatcher.Run();
}
4

2 回答 2

4

我不确定Subject在大量多线程场景中的行为。我可以想象,尽管在您所谈论的情况下,类似BlockingCollection(及其底层)的东西都很好用。ConcurrentQueue并且易于启动。

var queue = new BlockingCollection<long>();

// subscribing
queue.GetConsumingEnumerable()
     .ToObservable(Scheduler.NewThread)
     .Subscribe(i => Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId));

// sending
Observable.Interval(TimeSpan.FromMilliseconds(500), Scheduler.ThreadPool)
          .Do(i => Debug.WriteLine("Value: {0}, Sent on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId))
          .Subscribe(i => queue.Add(i));

你当然不想接触队列和锁。该ConcurrentQueue实现非常出色,并且肯定会有效地处理您正在谈论的大小队列。

于 2012-07-02T11:27:21.133 回答
3

看看EventLoopScheduler。它内置在 RX 中,我认为它可以满足您的所有需求。

您可以采用任意数量的可观察对象,调用.ObserveOn(els)els是您的实例EventLoopScheduler),您现在正在将多个线程中的多个可观察对象编组到单个线程上,并将每个调用OnNext串行排队。

于 2012-07-02T11:52:55.097 回答