我有一个由网络 I/O 频繁触发的事件源,基于底层设计,当然事件每次总是在不同的线程上,现在我通过 Rx 包装了这个事件:Observable.FromEventPattern(...)
,现在我使用TakeWhile(predict)
to过滤一些特殊事件数据。目前,我对它的线程安全性、hit 和 muteTakeWhile(predict)
的工作有些担心,但在并发的情况下,还能保证吗?因为我猜底层实现可能是(我无法阅读源代码,因为它太复杂了......):
public static IObservable<TSource> TakeWhile<TSource>(this IObservable<TSource> source, Func<TSource, bool> predict)
{
ISubject<TSource> takeUntilObservable = new TempObservable<TSource>();
IDisposable dps = null;
// 0 for takeUntilObservable still active, 1 for predict failed, diposed and OnCompleted already send.
int state = 0;
dps = source.Subscribe(
(s) =>
{
/* NOTE here the 'hit and mute' still not thread safe, one thread may enter 'else' and under CompareExchange, but meantime another thread may passed the predict(...) and calling OnNext(...)
* so the CompareExchange here mainly for avoid multiple time call OnCompleted() and Dispose();
*/
if (predict(s) && state == 0)
{
takeUntilObservable.OnNext(s);
}
else
{
// !=0 means already disposed and OnCompleted send, avoid multiple times called via parallel threads.
if (0 == Interlocked.CompareExchange(ref state, 1, 0))
{
try
{
takeUntilObservable.OnCompleted();
}
finally
{
dps.Dispose();
}
}
}
},
() =>
{
try
{
takeUntilObservable.OnCompleted();
}
finally { dps.Dispose(); }
},
(ex) => { takeUntilObservable.OnError(ex); });
return takeUntilObservable;
}
这TempObservable
只是 ISubject 的一个简单实现。
如果我的猜测合理,那么似乎无法保证线程安全,这意味着一些意外事件数据可能仍会传入 OnNext(...) 因为“静音”仍在进行中。然后我写了一个简单的测试来验证,但出乎意料,结果都是肯定的:
public class MultipleTheadEventSource
{
public event EventHandler OnSthNew;
int cocurrentCount = 1000;
public void Start()
{
for (int i = 0; i < this.cocurrentCount; i++)
{
int j = i;
ThreadPool.QueueUserWorkItem((state) =>
{
var safe = this.OnSthNew;
if (safe != null)
safe(j, null);
});
}
}
}
[TestMethod()]
public void MultipleTheadEventSourceTest()
{
int loopTimes = 10;
int onCompletedCalledTimes = 0;
for (int i = 0; i < loopTimes; i++)
{
MultipleTheadEventSource eventSim = new MultipleTheadEventSource();
var host = Observable.FromEventPattern(eventSim, "OnSthNew");
host.TakeWhile(p => { return int.Parse(p.Sender.ToString()) < 110; }).Subscribe((nxt) =>
{
//try print the unexpected values, BUT I Never saw it happened!!!
if (int.Parse(nxt.Sender.ToString()) >= 110)
{
this.testContextInstance.WriteLine(nxt.Sender.ToString());
}
}, () => { Interlocked.Increment(ref onCompletedCalledTimes); });
eventSim.Start();
}
// simply wait everything done.
Thread.Sleep(60000);
this.testContextInstance.WriteLine("onCompletedCalledTimes: " + onCompletedCalledTimes);
}
在我进行测试之前,这里的一些朋友建议我尝试使用Synchronize<TSource>
或ObserveOn
使其线程安全,所以对我的进展想法有什么想法以及为什么问题没有重现?