1

我正在使用来自 NuGet 的最新 Rx2。在下面的示例代码中,订阅是在新线程上处理的,不会阻止新值的生成。这都很好。但是,为什么 observable 生成的值会在新线程上按顺序处理?

static void Main(string[] args)
{
  printThreadId("Started");
  var observable = Observable.Generate<int, int>(
      0,
      x => {
        printThreadId("Comparing " + x);
        return x < 5; },
      x => {
        printThreadId("Incrementing " + x);
        return x + 1; },
      x => {
        printThreadId("Selecting " + x);
        return x; },
      NewThreadScheduler.Default
  );

  var disp = observable
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(state =>
    {
      printThreadId("Processing " + state);
      Thread.Sleep(1000);
    });

  Console.ReadLine();
  disp.Dispose();
}

static Action<string> printThreadId = (prefix) => Console.WriteLine("{0} on [Worker.{1}]", prefix, Thread.CurrentThread.ManagedThreadId);

上面的代码大约需要 5 秒才能完成。我希望每个动作都在一个新线程上执行,但这并没有发生。

下面的代码安排了一个新任务,整个过程在 1 秒多一点的时间内完成。

var disp = observable
 .ObserveOn(NewThreadScheduler.Default)
 .Subscribe(state =>
 {
   printThreadId("Processing " + state);
   NewThreadScheduler.Default.Schedule(() =>
   {
      printThreadId("Long running task " + state);
      Thread.Sleep(1000);
   });
 });

有没有更好的方法来安排长时间运行的任务同时使用ObserveOnand运行Subscribe

4

1 回答 1

3

反应式框架特别确保后续值被序列化而没有重叠。这有助于阻止许多并发问题。此行为是设计使然。

NewThreadScheduler动作排队,因此如果有背靠背调度的动作,调度程序不会打扰创建新线程 - 它只使用当前线程。当存在间隙时(某些东西可能已排队,但它是为将来准备的),则允许当前线程结束,并在操作到期时创建一个新线程。

如果您希望订阅并行运行,您必须不遗余力地实现它。但好消息是它并不太难。

如果你保留现有的observable,那么你可以试试这个:

Func<int, IObservable<Unit>> process = state =>
    Observable.Start(() =>
    {
        printThreadId("Processing " + state);
        Thread.Sleep(1000);
        return Unit.Default;
    });

var query =
    from x in observable
    select process(x);

var disp =
    query
        .Merge()
        .ObserveOn(NewThreadScheduler.Default)
        .Subscribe();

当我运行它时,我得到了这个结果:

Started on [Worker.23]
Comparing 0 on [Worker.26]
Selecting 0 on [Worker.26]
Processing 0 on [Worker.20]
Incrementing 0 on [Worker.26]
Comparing 1 on [Worker.26]
Selecting 1 on [Worker.26]
Incrementing 1 on [Worker.26]
Comparing 2 on [Worker.26]
Selecting 2 on [Worker.26]
Processing 1 on [Worker.18]
Incrementing 2 on [Worker.26]
Comparing 3 on [Worker.26]
Selecting 3 on [Worker.26]
Processing 3 on [Worker.13]
Incrementing 3 on [Worker.26]
Comparing 4 on [Worker.26]
Selecting 4 on [Worker.26]
Processing 4 on [Worker.12]
Incrementing 4 on [Worker.26]
Comparing 5 on [Worker.26]
Processing 2 on [Worker.8]
于 2012-10-30T04:06:04.647 回答