Rx 定义了一个语法,从任何给定的 Observer 的角度来看,该语法明确禁止 OnNext 重叠。可以并行调用流的多个订阅者(但这取决于 Rx 运算符的实现者)。
在这里,我们有两个订阅者以不同的速率处理来自同一流的 OnNext:
void Main()
{
var stream = Observable.Interval(TimeSpan.FromSeconds(1));
var sub1 = stream.Subscribe(x => {
Console.WriteLine("Sub1 handler start: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(4000);
Console.WriteLine("Sub1 handler end");
});
var sub2 = stream.Subscribe(x => {
Console.WriteLine("Sub2 handler start: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(2000);
Console.WriteLine("Sub2 handler end");
});
Console.ReadLine();
}
这是输出,看看 Sub2 是如何领先于 Sub1 的,并且每个都在自己的线程上。
Sub2 handler start: 18
Sub1 handler start: 12
Sub2 handler end
Sub2 handler start: 18
Sub2 handler end
Sub2 handler start: 18
Sub1 handler end
Sub1 handler start: 12
Sub2 handler end
Sub2 handler start: 18
Sub2 handler end
Sub1 handler end
Sub2 handler start: 18
Sub1 handler start: 12
请注意,没有什么可以说每个订阅都有自己的线程 - 这取决于调度程序和操作员的实现方式。只要它们符合OnNext* (OnError | OnCompleted)的 Rx 语法,一切都会发生。
对于您的特定情况,我会研究PLINQ / TPL - 它感觉比 Rx 更合适。
顺便说一句,如果您刚刚起步,Lee Campbell 的www.introtorx.com是一个很好的资源。