3

让我们考虑以下流

  SomeState state = new SomeState().

 _refreshFiberStream =
    Stream()
    .SubscribeOn(new EventLoopScheduler()) 
    .Select(DoCalc)
    .ObserveOn(DispatcherScheduler.Current)
    .Subscribe(Update);

DoCalc 方法将投影到输入中并使用“状态”,并将结果输出提供给 Update 方法,该方法将修改“状态”。如果有新事件出现,它应该根据上一个事件最后更新的状态进行操作,并以此为基础进行项目。

我正在寻找一种始终按顺序执行事件的方式。例如,如果我有三个事件,我正在寻找一种方法,以便它们在 DoCalc 中执行,Update 后跟 DoCalc,Update 后跟 DoCalc,Update。

相反,我看到的是 DoCalc、DoCalc、Update、Update、DoCalc、Update,即它们从不按顺序运行。

有没有办法在 Rx 中强制执行它

4

1 回答 1

2

我看到一方面需要按顺序执行,另一方面需要分派到另一个线程。我的建议是Update分成两部分:

  1. 需要按顺序执行的部分(更新)
  2. 需要分派的部分(Dispatch)

然后您可以Do(Update)按顺序调用,并Subscribe(Dispatch)在调度程序上:

var result =
    Stream()
        .SubscribeOn(new EventLoopScheduler())
        .Select(DoCalc)
        .Do(Update)
        .ObserveOn(DispatcherScheduler.Current)
        .Subscribe(Dispatch);

结果序列是这样的(“Dispatch n”调用可能在“Update n”之后的任何时间发生):

    选择一个
    更新一个
    选择 b
    更新 b
    派遣一个
    调度 b

我想另一种方法是使用 a ManualResetEvent,它指示下一个 DoCalc 只能在更新发生后继续。您可以通过添加ManualResetEvent.WaitOne到 DoCalc 和ManualResetEvent.Set更新来做到这一点:

private ManualResetEvent _wait = new ManualResetEvent(true);

private string DoCalc(string input)
{
    _wait.WaitOne();
    Console.WriteLine("Selected {0}", input);
    _wait.Reset();
    return input;
}

private void Update(string input)
{
    Console.WriteLine("Update {0}", input);
    _wait.Set();
}

第二种方法“有效”,但是像这样的线程阻塞让我感到不安——它似乎与反应式编程有交叉用途。当然,同样,如果可能的话,最好避免引入状态。

于 2013-10-04T22:13:02.887 回答