我正在测试尝试测试 RX 并创建 Stream() ,它提供两个相隔 1 秒的事件。
private IObservable<string> Stream()
{
return Observable.Create<string>
(
(IObserver<string> observer) =>
{
observer.OnNext("a");
observer.OnNext("b");
observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
}
);
}
_refreshFiberStream =
Stream()
.SubscribeOn(schedulerProvider.EventLoop)
.Select(DoCalc)
.ObserveOn(schedulerProvider.Dispatcher)
.Subscribe(Update);
和 ScheduleProvider
public sealed class SchedulerProvider : ISchedulerProvider
{
public IScheduler Dispatcher
{
get { return DispatcherScheduler.Current; }
}
public IScheduler EventLoop
{
get { return new EventLoopScheduler(); }
}
// ...
}
我看到每个输入都会调用 DoCalc 方法两次,然后是调用两次的 Update 方法,DoCalc、DoCalc、Update、Update。相反,我试图对 DoCalc 方法和 Update 方法进行排序。重复第二个输入的序列,因此第二个输入可以建立在第一个输入的结果之上,DoCalc,Update,DoCalc,Update
有什么想法吗