0

我有以下异步队列处理路由。

      var commandQueue = new BlockingCollection<MyCommand>();
      commandQueue
            .GetConsumingEnumerable()
            .ToObservable(new LimitedConcurrencyLevelTaskPoolScheduler(5))
            .Subscribe(c =>
                           {
                               try
                               {
                                   ProcessCommand(c);
                               }
                               catch (Exception ex)
                               {
                                   Trace.TraceError(ex.ToString());
                               }
                           }
            );

在一种特定情况下(当我要获取一些数据时),我需要确保我的 commandQueue 为空,然后再出去获取数据。此操作预计将同步发生。基本上,我想做类似的事情

  public void GetData()
  {
     commandQueue.WaitForEmpty(); 

     // could potentially be expressed: 
     // while (commandQueue.Count > 0) Thread.Sleep(10);

     return GoGetTheData()
  }

我意识到在理想情况下,所有调用者都将“GetData”异步......但有时它必须以同步方式发生......所以我需要等待命令队列为空以确保一致性和我的数据的最新性。

我知道如何使用 ManualResetEvent 很容易地做到这一点……但我想知道 System.Reactive/TPL 是否有简单的方法。

谢谢。

4

3 回答 3

1

这是一个比起初看起来更困难的问题。您想要BlockingCollection(和底层ConcurrentQueue)生产者-消费者工作语义。但是您还希望能够观察这些集合发生了什么,包括等待“空”信号。

最好的办法是从这里看JobQueue一下ParallelJobQueue

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7

其中包括一个 observableWhenQueueEmpty并且可以控制同时运行的作业和排队作业的数量(在这种情况下,作业与您的命令概念同义)。

于 2012-06-28T00:46:11.210 回答
0

你能用这个吗?

    var dataObservable = Observable.Start(() =>
    {
        commandQueue.WaitForEmpty(); 
        return GoGetTheData();
    });
于 2012-06-27T07:04:09.913 回答
0

在我看来你的要求是

  • 异步获取数据
  • 并行处理这些数据(最多 5 度并行度)
  • 重复该过程

如果这些是您的要求并且您没有被迫使用 BlockingCollection 即它不是现有的 API,那么我认为您可以单独使用 Rx 很容易地解决这个问题。

var dataRequestScheduler = new EventLoopScheduler();
var subscription = GetTheData()
    .Repeat()
    .SubscribeOn(dataRequestScheduler)
    .ObserveOn(Scheduler.TaskPool)//new LimitedConcurrencyLevelTaskPoolScheduler(5)
    .Subscribe(c =>
           {
               try
               {
                   ProcessCommand(c);
               }
               catch (Exception ex)
               {
                   Trace.TraceError(ex.ToString());
               }
           }
        );

GetTheData 方法在哪里返回一个 IObservable

您可以利用 Observable.Start 和 Merge(5) 来获得最多 5 个线程,而无需自定义调度程序。

于 2012-07-06T11:38:01.313 回答