我订阅了一个推送频率很高的 Observable,这些内容来自网络 I/O,所以每次推送最初来自不同的线程,然后我有一些观察者可能会尝试获取一些内容然后快速取消订阅以确保有没有其他内容传入,因此代码示例如下:
IDisposable dsp = null;
dsp = TargetObservable.Subscribe((incomingContent) =>
{
if (incomingContent == "something")
{
myList.Add(incomingContent);
dsp.Dispose();
}
else
{
otherList.Add(incomingContent);
}
});
目前, OnNext 显然不是线程安全的,这意味着当观察者获得“某物”并且就在调用 Dispose() 之前,其他内容可能仍会传入并添加到 'otherList',即使我放了一个 'lock(.. .)' 代表整个 'onNext(...)'。
这不是我们想要的,所以有什么想法可以避免这种情况吗?我能想到的一种方法是修改 Observable 以逐个推送内容(通过使用'lock'),那么性能肯定会受到很大影响。谢谢。