我想使用 Rx 扩展来处理长文件绑定操作的并行化。
工作流程大致如下:
- 在多个驱动器上搜索给定的文件模式(假设每个驱动器位于单独的物理设备上)
- 对于找到的每个匹配文件,将长文件操作排队到与同一驱动器上的其他文件相同的线程- 希望最大限度地减少随机搜索。
- 对不同驱动器上的文件的操作应排队到不同的线程以允许并行处理。
我的问题是:我应该使用什么 Rx 调度程序(或调度程序组合)?
我想使用 Rx 扩展来处理长文件绑定操作的并行化。
工作流程大致如下:
我的问题是:我应该使用什么 Rx 调度程序(或调度程序组合)?
为此,意识到每个 Rx observable 订阅都是连续工作的,这一点非常有用。也就是说,对于单个 observable 的单个订阅,您可以确保一个项目的onNext
委托onNext
在下一个项目开始之前完成。
默认情况下,onNext
委托在当前线程(调用 的线程OnNext()
)上执行,但您可以使用ObserveOn()
.
这对您来说意味着您应该为每个物理驱动器创建一个单独的 observable,并在单独的线程上观察每个驱动器。如果您要执行一个可观察的操作,那么一种方法是使用GroupBy()
.
使用哪个特定的调度程序?我认为这几乎没有关系。ObserveOn()
似乎在ScheduleLongRunning()
可用时使用,这对于最常见的调度程序意味着它将创建一个新线程进行观察。
将所有这些放在一起,您的代码可能类似于:
operations.GroupBy(op => op.Drive)
.Select(o => o.ObserveOn(TaskPoolScheduler.Default))
.Do(o => o.Subscribe(op => op.Execute()))
.Subscribe();
(假设operations
您的操作类型是可观察的,它具有Drive
属性和Execute()
方法。)