使用 Rx ,从多个线程Subject
调用是否是线程安全的?OnNext()
所以序列可以从多个来源生成。
合并会做同样的事情吗?
使用 Rx ,从多个线程Subject
调用是否是线程安全的?OnNext()
所以序列可以从多个来源生成。
合并会做同样的事情吗?
Rx 合约要求通知是顺序的,并且对于多个运营商来说是逻辑上的必要。也就是说,您可以使用可用的Synchronize
方法来获得这种行为。
var subject = new Subject<int>();
var syncedSubject = Subject.Synchronize(subject);
您现在可以同时调用syncedSubject
. 对于必须同步的观察者,您还可以使用:
var observer = Observer.Create<Unit>(...);
var syncedObserver = Observer.Synchronize(observer);
测试:
Func<int, Action> onNext = i => () => syncedSubject.OnNext(i);
Parallel.Invoke
(
onNext(1),
onNext(2),
onNext(3),
onNext(4)
);
不,序列是连续的,因此不允许重叠通知。您可以使用 Synchronize 扩展方法来强制执行正确的同步。像 Merge 这样的操作员使用锁来调用下游观察者,以确保对 On* 回调进行正确的串行调用。
调用someSubject.OnNext()
与线程安全一样someList.Add()
- 您可以从 > 1 个线程调用它,但不能同时调用。将您OnNext
的声明包含在lock
声明中,它将是安全的。