我正在编写一个 C# (.NET 4.5) 应用程序,用于聚合基于时间的事件以进行报告。为了使我的查询逻辑可重用于实时和历史数据,我使用了 Reactive Extensions (2.0) 及其IScheduler
基础设施(HistoricalScheduler
和朋友)。
例如,假设我们创建了一个事件列表(按时间顺序排序,但它们可能重合!),其唯一的有效负载是它们的时间戳,并且想知道它们在固定持续时间的缓冲区中的分布:
const int num = 100000;
const int dist = 10;
var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();
var time = new HistoricalScheduler(curr);
for (int i = 0; i < num; i++)
{
events.Add(curr);
curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}
var stream = Observable.Generate<int, DateTimeOffset>(
0,
s => s < events.Count,
s => s + 1,
s => events[s],
s => events[s],
time);
stream.Buffer(TimeSpan.FromMilliseconds(num), time)
.Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));
time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));
运行此代码会产生System.StackOverflowException
以下堆栈跟踪(一直是最后 3 行):
mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes
...
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes
...
好的,问题似乎来自我Observable.Generate()
对 的使用,具体取决于列表大小(num
)并且无论调度程序的选择如何。
我究竟做错了什么?IObservable
或者更一般地说,从IEnumerable
提供自己时间戳的事件中创建事件的首选方法是什么?