0

我正在测试尝试测试 RX 并创建 Stream() ,它提供两个相隔 1 秒的事件。

private IObservable<string> Stream()
{
    return Observable.Create<string>
    (
        (IObserver<string> observer) =>
        {
            observer.OnNext("a");
            observer.OnNext("b");
            observer.OnCompleted();         
            return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
        }
    );
}

  _refreshFiberStream =
    Stream()
    .SubscribeOn(schedulerProvider.EventLoop) 
    .Select(DoCalc)
    .ObserveOn(schedulerProvider.Dispatcher)
    .Subscribe(Update);

和 ScheduleProvider

 public sealed class SchedulerProvider : ISchedulerProvider
  {
    public IScheduler Dispatcher
    {
      get { return DispatcherScheduler.Current; }
    }

    public IScheduler EventLoop
    {
      get { return new EventLoopScheduler(); }
    }
    // ...
  }

我看到每个输入都会调用 DoCalc 方法两次,然后是调用两次的 Update 方法,DoCalc、DoCalc、Update、Update。相反,我试图对 DoCalc 方法和 Update 方法进行排序。重复第二个输入的序列,因此第二个输入可以建立在第一个输入的结果之上,DoCalc,Update,DoCalc,Update

有什么想法吗

4

1 回答 1

0

首先,我将遵循将您的 SubscribeOn/ObserveOn 对放在您的最终订阅方法之前的模式。这只会为您节省大量的头发拉扯。

_refreshFiberStream = Stream()
    .Select(DoCalc)
    //Always in this pattern
    .SubscribeOn(schedulerProvider.EventLoop) //SubscribeOn right before ObserveOn (or before Subscribe if no ObserveOn is used)
    .ObserveOn(schedulerProvider.Dispatcher)  //ObserveOn right before Subscribe
    .Subscribe(Update);                       //Subscribe Last

接下来,如果我重新创建您的示例并添加缺少的代码,我会得到正确/预期的输出。

void Main()
{
    var els = new EventLoopScheduler();
    var dispatcher = new EventLoopScheduler();

    Stream()
        .SubscribeOn(els) //TODO: Move to line just before ObserveOn
        .Select(DoCalc)
        .ObserveOn(dispatcher)
        .Subscribe(Update); 
}

// Define other methods and classes here
private IObservable<string> Stream()
{
    return Observable.Create<string>
    (
        (IObserver<string> observer) =>
        {
            observer.OnNext("a");
            Thread.Sleep(1000);
            observer.OnNext("b");
            observer.OnCompleted();         
            return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
        }
    );
}
private string DoCalc(string val)
{
    val.Dump("DoCalc()");
    Thread.Sleep(1000);
    return val;
}
private void Update(string val)
{
    val.Dump("Update()");
}

那么您可以重新发布带有输出的完整示例代码吗?作为单元测试、控制台应用程序或 LinqPad 示例执行此操作。

猜测您的下一个问题,可能是:如何使用前一个值的投影 ( Select) 中的值来帮助计算下一个值?Scan如果您想要一组正在运行的计算,请使用该Aggregate方法,或者如果您只想要最终结果而不是计算每个值,请使用该方法。

于 2013-10-04T08:26:49.747 回答