17

使用 Rx ,从多个线程Subject调用是否是线程安全的?OnNext()

所以序列可以从多个来源生成。

合并会做同样的事情吗?

4

3 回答 3

22

Rx 合约要求通知是顺序的,并且对于多个运营商来说是逻辑上的必要。也就是说,您可以使用可用的Synchronize方法来获得这种行为。

var subject = new Subject<int>();
var syncedSubject = Subject.Synchronize(subject);            

您现在可以同时调用syncedSubject. 对于必须同步的观察者,您还可以使用:

var observer = Observer.Create<Unit>(...);
var syncedObserver = Observer.Synchronize(observer);

测试:

Func<int, Action> onNext = i => () => syncedSubject.OnNext(i);
Parallel.Invoke
(
    onNext(1),
    onNext(2),
    onNext(3),
    onNext(4)
);
于 2012-09-05T06:50:51.330 回答
6

不,序列是连续的,因此不允许重叠通知。您可以使用 Synchronize 扩展方法来强制执行正确的同步。像 Merge 这样的操作员使用锁来调用下游观察者,以确保对 On* 回调进行正确的串行调用。

于 2012-09-04T20:44:19.920 回答
6

调用someSubject.OnNext()与线程安全一样someList.Add()- 您可以从 > 1 个线程调用它,但不能同时调用。将您OnNext的声明包含在lock声明中,它将是安全的。

于 2012-09-05T06:13:45.900 回答