我的问题是Rx 中 Advanceable 历史流和实时流的扩展
我希望能够使用各种滚动时间范围(每小时、每天等)监控我的流中的变化。如果我的历史数据存在差距,例如在每小时时间范围内,我的数据可以追溯到 30' 之前和 120' 之前,即我缺少 [-120' 到 -30'] 的数据间隔,我想基于 120' 观察的当前变化。到目前为止,我所拥有的代码的问题是上述示例中的更改将基于值 0,因为缺少 -60 观察值。
我不知道如何完成这个,或者是否有一个更清洁、更好的解决方案。
void Main()
{
const double bufferDuration = 8;
var now = DateTimeOffset.Now;
// create a historical log for testing
var log = new List<ValueTime>
{
new ValueTime { Ts = now.AddMilliseconds(-5000).ToString(), Balance = 1L },
new ValueTime { Ts = now.AddMilliseconds(-4000).ToString(), Balance = 2L },
new ValueTime { Ts = now.AddMilliseconds(-3000).ToString(), Balance = 4L }
};
var scheduler = new HistoricalScheduler();
scheduler.AdvanceTo(DateTime.Parse(log[0].Ts).AddSeconds(-0));
// historical part of the stream
var replay = Observable.Generate(
log.ToObservable().GetEnumerator(),
events => events.MoveNext(),
events => events,
events => events.Current.Balance,
events => DateTime.Parse(events.Current.Ts),
scheduler);
// create the real time part of the stream using a timer
var realTime = Observable.Create<long>(observer =>
{
var timer = Observable.Interval(TimeSpan.FromSeconds(2)).Select(x => 10 + x);
var disposable = timer.Subscribe(x => observer.OnNext(x));
return disposable;
});
// combine the two streams
var combined = replay
.Concat(realTime)
.Publish()
.RefCount();
combined.Timestamp(scheduler).Dump("Combined stream");
// use the real time stream to set the time of the historical scheduler's
realTime.Subscribe(_ =>
{
scheduler.AdvanceTo(DateTime.Now);
},
ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Done"));
// use a rolling buffer of "bufferDuration" length
var combinedBuffer = combined
.RollingBuffer(TimeSpan.FromSeconds(bufferDuration), scheduler)
.Select(x => x.Sum());
combinedBuffer.Timestamp(scheduler).Dump($"{bufferDuration}' Rolling buffer aggregation");
// display the values that are included in each window of the rolling buffer
combined
.RollingBuffer(TimeSpan.FromSeconds(bufferDuration), scheduler)
.Select(x => string.Join(",", x))
.Timestamp(scheduler).Dump($"{bufferDuration}' Rolling buffer lists");
// observe the difference between two consecutive observations
combinedBuffer.CombineWithPrevious((x, y) => y - x).Timestamp(scheduler).Dump($"{bufferDuration}' Rolling differences");
scheduler.Start();
}
class ValueTime
{
public long Balance;
public string Ts;
}
static class Extensions
{
public static IObservable<T[]> RollingBuffer<T>(
this IObservable<T> @this,
TimeSpan buffering, IScheduler scheduler)
{
return Observable.Create<
T[]>(o =>
{
var list = new LinkedList<Timestamped<T>>();
return @this.Timestamp(scheduler).Subscribe(tx =>
{
list.AddLast(tx);
while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
{
list.RemoveFirst();
}
o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
}, o.OnError, o.OnCompleted);
});
}
public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(this IObservable<TSource> source,
Func<TSource, TSource, TResult> resultSelector)
{
return source.Scan(
Tuple.Create(default(TSource), default(TSource)),
(previous, current) => Tuple.Create(previous.Item2, current))
.Select(t => resultSelector(t.Item1, t.Item2));
}
}
在 LINQPad 中运行它会产生以下结果:
在“8'滚动差异”表中,上午 1:13:30 的条目应该是 9 而不是 13,因为我希望将在上午 1:13:19 产生的 4 的值包含在计算中区别。
滚动缓冲区的当前版本,或者我为此采取的方法似乎是不够的,它可能会分散我想要实现的目标。我不能使用基于计数的缓冲区,因为如果我的观察中有漏洞,这可能会让我回到过去。例如,如果我在以下示例中使用计数 2,则 10:00:00 滚动差异将使用 x2 – x0,这不是我要找的结果。
我正在寻找的行为是获取当前值与 >= 8'' 之前(8'' 之前或 8'' 之前)的滚动差异。例如
Ts: 9:32:00 9:36:00 10:00:00 10:00:04 10:00:08 10:00:12 10:00:16
Level: --x0------x1-------x2-------x3-------x4-------x5-------x6---
Roll D.--x0----(x1-x0)-(x2-x1)--(x3-x1)--(x4-x2)--(x5-x3)--(x6-x4)-