我正试图围绕 Reactive Extensions 对并发的支持,并且很难获得我想要的结果。所以我可能还不明白。
我有一个源,它向流中发送数据的速度比订阅者消耗它的速度要快。我更喜欢配置流,以便使用另一个线程为流中的每个新项目调用订阅者,以便订阅者有多个线程同时通过它运行。我能够确保订阅者的线程安全。
以下示例演示了该问题:
Observable.Interval( TimeSpan.FromSeconds(1))
.Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x))
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(x =>
{
Console.WriteLine("{0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(5000); // Simulate long work time
});
控制台输出如下所示(删除日期):
4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6
请注意“观察值”时间增量。即使源继续发出数据的速度快于订阅者处理它的速度,订阅者也不会被并行调用。虽然我可以想象当前行为有用的一堆场景,但我需要能够在消息可用时立即处理它们。
我用 ObserveOn 方法尝试了几种调度程序的变体,但它们似乎都没有做我想要的。
除了在订阅操作中分离一个线程以执行长时间运行的工作之外,我还缺少什么可以将数据并发传递给订阅者吗?
提前感谢所有答案和建议!