16

我正试图围绕 Reactive Extensions 对并发的支持,并且很难获得我想要的结果。所以我可能还不明白

我有一个源,它向流中发送数据的速度比订阅者消耗它的速度要快。我更喜欢配置流,以便使用另一个线程为流中的每个新项目调用订阅者,以便订阅者有多个线程同时通过它运行。我能够确保订阅者的线程安全。

以下示例演示了该问题:

Observable.Interval( TimeSpan.FromSeconds(1))
    .Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now, 
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(x =>
               {
                   Console.WriteLine("{0} Thread: {1} Observed value: {2}",
                                     DateTime.Now,
                                     Thread.CurrentThread.ManagedThreadId, x);
                   Thread.Sleep(5000); // Simulate long work time
               });

控制台输出如下所示(删除日期):

4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6

请注意“观察值”时间增量。即使源继续发出数据的速度快于订阅者处理它的速度,订阅者也不会被并行调用。虽然我可以想象当前行为有用的一堆场景,但我需要能够在消息可用时立即处理它们。

我用 ObserveOn 方法尝试了几种调度程序的变体,但它们似乎都没有做我想要的。

除了在订阅操作中分离一个线程以执行长时间运行的工作之外,我还缺少什么可以将数据并发传递给订阅者吗?

提前感谢所有答案和建议!

4

1 回答 1

19

这里的基本问题是,您希望 Rx observable 以一种真正违反 observable 工作规则的方式调度事件。我认为在这里查看 Rx 设计指南会很有启发性:http: //go.microsoft.com/fwlink/ ?LinkID=205219 - 最值得注意的是,“4.2 假设观察者实例以序列化方式调用”。即您不应该能够并行运行 OnNext 调用。事实上,Rx 的排序行为是其设计理念的核心。

如果您查看源代码,您会看到 Rx 在派生ScheduledObserver<T>类中强制执行此行为ObserveOnObserver<T>... OnNexts 从内部队列分派,并且每个必须在分派下一个之前完成 - 在给定的执行上下文中。Rx 不允许单个订阅者的 OnNext 调用同时执行。

这并不是说您不能让多个订阅者以不同的速率执行。事实上,如果您按如下方式更改代码,这很容易看出:

var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now,
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default);

var subscription1 = source.Subscribe(x =>
    {
        Console.WriteLine("Subscriber 1: {0} Thread: {1} Observed value: {2}",
                            DateTime.Now,
                            Thread.CurrentThread.ManagedThreadId, x);
        Thread.Sleep(1000); // Simulate long work time
    });

var subscription2 = source.Subscribe(x =>
{
    Console.WriteLine("Subscriber 2: {0} Thread: {1} Observed value: {2}",
                        DateTime.Now,
                        Thread.CurrentThread.ManagedThreadId, x);
    Thread.Sleep(5000); // Simulate long work time
});

现在您会看到订阅者 1 领先于订阅者 2。

What you can't easily do is ask an observable to do something like dispatch of an OnNext call to a "ready" subscriber - which is kind of what you are asking for in a roundabout way. I also presume you wouldn't really want to create a new thread for every OnNext in a slow consumer situation!

In this scenario it sounds like you might be better off with a single subscriber that does nothing other than push work onto a queue as fast as possible, which is in turn serviced by a number of consuming worker threads you could then control as necessary to keep pace.

于 2013-05-21T00:08:03.873 回答