2

我想使用 Rx 扩展来处理长文件绑定操作的并行化。

工作流程大致如下:

  • 在多个驱动器上搜索给定的文件模式(假设每个驱动器位于单独的物理设备上)
  • 对于找到的每个匹配文件,将长文件操作排队到与同一驱动器上的其他文件相同的线程- 希望最大限度地减少随机搜索。
  • 对不同驱动器上的文件的操作应排队到不同的线程以允许并行处理。

我的问题是:我应该使用什么 Rx 调度程序(或调度程序组合)?

4

1 回答 1

6

为此,意识到每个 Rx observable 订阅都是连续工作的,这一点非常有用。也就是说,对于单个 observable 的单个订阅,您可以确保一个项目的onNext委托onNext在下一个项目开始之前完成。

默认情况下,onNext委托在当前线程(调用 的线程OnNext())上执行,但您可以使用ObserveOn().

这对您来说意味着您应该为每个物理驱动器创建一个单独的 observable,并在单独的线程上观察每个驱动器。如果您要执行一个可观察的操作,那么一种方法是使用GroupBy().

使用哪个特定的调度程序?我认为这几乎没有关系。ObserveOn()似乎在ScheduleLongRunning()可用时使用,这对于最常见的调度程序意味着它将创建一个新线程进行观察。

将所有这些放在一起,您的代码可能类似于:

operations.GroupBy(op => op.Drive)
          .Select(o => o.ObserveOn(TaskPoolScheduler.Default))
          .Do(o => o.Subscribe(op => op.Execute()))
          .Subscribe();

(假设operations您的操作类型是可观察的,它具有Drive属性和Execute()方法。)

于 2013-04-05T00:51:07.530 回答