0

我的问题是Rx 中 Advanceable 历史流和实时流的扩展

我希望能够使用各种滚动时间范围(每小时、每天等)监控我的流中的变化。如果我的历史数据存在差距,例如在每小时时间范围内,我的数据可以追溯到 30' 之前和 120' 之前,即我缺少 [-120' 到 -30'] 的数据间隔,我想基于 120' 观察的当前变化。到目前为止,我所拥有的代码的问题是上述示例中的更改将基于值 0,因为缺少 -60 观察值。

我不知道如何完成这个,或者是否有一个更清洁、更好的解决方案。

void Main()
{   
    const double bufferDuration = 8;    
    var now = DateTimeOffset.Now;

    // create a historical log for testing
    var log = new List<ValueTime>
            {
                new ValueTime { Ts = now.AddMilliseconds(-5000).ToString(), Balance = 1L },
                new ValueTime { Ts = now.AddMilliseconds(-4000).ToString(), Balance = 2L },
                new ValueTime { Ts = now.AddMilliseconds(-3000).ToString(), Balance = 4L }
            };

    var scheduler = new HistoricalScheduler();

    scheduler.AdvanceTo(DateTime.Parse(log[0].Ts).AddSeconds(-0));

    // historical part of the stream
    var replay = Observable.Generate(
            log.ToObservable().GetEnumerator(),
            events => events.MoveNext(),
            events => events,
            events => events.Current.Balance,
            events => DateTime.Parse(events.Current.Ts),
            scheduler);

    // create the real time part of the stream using a timer
    var realTime = Observable.Create<long>(observer =>
        {
            var timer = Observable.Interval(TimeSpan.FromSeconds(2)).Select(x => 10 + x);
            var disposable = timer.Subscribe(x => observer.OnNext(x));
            return disposable;
        });

    // combine the two streams
    var combined = replay
            .Concat(realTime)
            .Publish()
            .RefCount();

    combined.Timestamp(scheduler).Dump("Combined stream");

    // use the real time stream to set the time of the historical scheduler's
    realTime.Subscribe(_ =>
        {
            scheduler.AdvanceTo(DateTime.Now);
        },
        ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Done"));

    // use a rolling buffer of "bufferDuration" length 
    var combinedBuffer = combined
            .RollingBuffer(TimeSpan.FromSeconds(bufferDuration), scheduler)
            .Select(x => x.Sum());

    combinedBuffer.Timestamp(scheduler).Dump($"{bufferDuration}' Rolling buffer aggregation");

    // display the values that are included in each window of the rolling buffer
    combined
    .RollingBuffer(TimeSpan.FromSeconds(bufferDuration), scheduler)
    .Select(x => string.Join(",", x))
    .Timestamp(scheduler).Dump($"{bufferDuration}' Rolling buffer lists");

    // observe the difference between two consecutive observations
    combinedBuffer.CombineWithPrevious((x, y) => y - x).Timestamp(scheduler).Dump($"{bufferDuration}' Rolling differences");

    scheduler.Start();
}

class ValueTime
{
    public long Balance;
    public string Ts;
}

static class Extensions
{
    public static IObservable<T[]> RollingBuffer<T>(
        this IObservable<T> @this,
        TimeSpan buffering, IScheduler scheduler)
    {
        return Observable.Create<
        T[]>(o =>
        {
            var list = new LinkedList<Timestamped<T>>();
            return @this.Timestamp(scheduler).Subscribe(tx =>
            {
                list.AddLast(tx);
                while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
                {
                    list.RemoveFirst();
                }
                o.OnNext(list.Select(tx2 => tx2.Value).ToArray());

            }, o.OnError, o.OnCompleted);
        });
    }

    public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(this IObservable<TSource> source,
                                                                                Func<TSource, TSource, TResult> resultSelector)
    {
        return source.Scan(
            Tuple.Create(default(TSource), default(TSource)),
            (previous, current) => Tuple.Create(previous.Item2, current))
            .Select(t => resultSelector(t.Item1, t.Item2));
    }
}

在 LINQPad 中运行它会产生以下结果:

在此处输入图像描述

在“8'滚动差异”表中,上午 1:13:30 的条目应该是 9 而不是 13,因为我希望将在上午 1:13:19 产生的 4 的值包含在计算中区别。

滚动缓冲区的当前版本,或者我为此采取的方法似乎是不够的,它可能会分散我想要实现的目标。我不能使用基于计数的缓冲区,因为如果我的观察中有漏洞,这可能会让我回到过去。例如,如果我在以下示例中使用计数 2,则 10:00:00 滚动差异将使用 x2 – x0,这不是我要找的结果。

我正在寻找的行为是获取当前值与 >= 8'' 之前(8'' 之前或 8'' 之前)的滚动差异。例如

Ts:    9:32:00 9:36:00 10:00:00 10:00:04 10:00:08 10:00:12 10:00:16
Level: --x0------x1-------x2-------x3-------x4-------x5-------x6--- 
Roll D.--x0----(x1-x0)-(x2-x1)--(x3-x1)--(x4-x2)--(x5-x3)--(x6-x4)-
4

1 回答 1

0

我会认为这个问题足够普遍,已经找到了解决方案,或者至少有几个人会对它感兴趣。无论如何,在与它斗争了相当多的时间之后,这主要是由于我对 Rx 的理解不足,我发现实现所需行为的步骤是使用上面的代码作为基础,并且:

A. 修改滚动缓冲区扩展方法:

list.AddLast(tx);
while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
{
    list.RemoveFirst();
}
o.OnNext(list.Select(tx2 => tx2.Value).ToArray());

至:

    list.AddLast(tx);

    var nowTime = scheduler.Now.DateTime;
    System.Reactive.Timestamped<T> el = default(System.Reactive.Timestamped<T>);
    while (list.Count > 1 && list.First.Value.Timestamp < nowTime.Subtract(buffering))
    {
        el = list.First.Value;
        list.RemoveFirst();
    }

    if (el != default(System.Reactive.Timestamped<T>) && (list.Count <= 1 || list.First.Value.Timestamp > nowTime.Subtract(buffering)))
    {
        list.AddFirst(el);
    }
    o.OnNext(list.Select(tx2 => tx2.Value).ToArray());

B. 使用列表中最后一个元素和第一个元素之间的差值,而不是值列表的总和:

var combinedBuffer = combined
            .RollingBuffer(TimeSpan.FromSeconds(bufferDuration), scheduler)
            .Select(x => x.Last() - x.First());

C. 完全删除“CombineWithPrevious”调用。

combinedBuffer.CombineWithPrevious((x, y) => y - x).Timestamp(scheduler).Dump($"{bufferDuration}' Rolling differences");

带有一些调试打印输出的完整调用如下:

 void Main()
{
    const double bufferDuration = 8;
    var now = DateTimeOffset.Now;

    Console.WriteLine(now);

    // create a historical log for testing
    var log = new List<ValueTime>
            {
                new ValueTime { Ts = now.AddMilliseconds(-5000).ToString(), Balance = 1L },
                new ValueTime { Ts = now.AddMilliseconds(-4000).ToString(), Balance = 2L },
                new ValueTime { Ts = now.AddMilliseconds(-3000).ToString(), Balance = 4L }
            };

    var scheduler = new HistoricalScheduler();

    scheduler.AdvanceTo(DateTime.Parse(log[0].Ts));

    // historical part of the stream
    var replay = Observable.Generate(
            log.ToObservable().GetEnumerator(),
            events => events.MoveNext(),
            events => events,
            events => events.Current.Balance,
            events => DateTime.Parse(events.Current.Ts),
            scheduler);

    // create the real time part of the stream using a timer
    var realTime = Observable.Create<long>(observer =>
        {
            var timer = Observable.Interval(TimeSpan.FromSeconds(2)).Select(x => 10 + x);
            var disposable = timer.Subscribe(x => observer.OnNext(x));
            return disposable;
        });

    // combine the two streams
    var combined = replay
            .Concat(realTime)
            .Publish()
            .RefCount();

    combined.Timestamp(scheduler).Dump("Combined stream");

    // use the real time stream to set the time of the historical scheduler's
    realTime.Subscribe(_ =>
        {
            scheduler.AdvanceTo(DateTime.Now);
        },
        ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Done"));

    var combinedRollingBuffer = combined
            .RollingBufferDeltaChange(TimeSpan.FromSeconds(bufferDuration), scheduler)
            .Publish()
            .RefCount();

    // use a rolling buffer of "bufferDuration" length 
    var combinedBuffer = combinedRollingBuffer
            //.Select(x => x.Sum());
            .Select(x => x.Last() - x.First());

    combinedBuffer.Timestamp(scheduler).Dump($"{bufferDuration}'' Rolling buffer aggregation");

    // display the values that are included in each window of the rolling buffer
    combinedRollingBuffer
    .Select(x => string.Join(",", x))
    .Timestamp(scheduler).Dump($"{bufferDuration}'' Rolling buffer lists");

    scheduler.Start();
}

class ValueTime
{
    public long Balance;
    public string Ts;
}

static class Extensions
{
    public static IObservable<T[]> RollingBufferDeltaChange<T>(
        this IObservable<T> @this,
        TimeSpan buffering, IScheduler scheduler)
    {
        return Observable.Create<
        T[]>(o =>
        {
            var list = new LinkedList<Timestamped<T>>();
            return @this.Timestamp(scheduler).Subscribe(tx =>
            {
                list.AddLast(tx);
                Console.WriteLine($"{scheduler.Now} Adding Tx: {tx.Timestamp}  {tx.Value} list.First: {list.First.Value.Timestamp} {list.First.Value.Value} list.Last: {list.Last.Value.Timestamp} {list.Last.Value.Value}");

                DateTime nowTime = scheduler.Now.DateTime; // DateTime.Now;

                System.Reactive.Timestamped<T> el = default(System.Reactive.Timestamped<T>);
                while (list.Count > 1 && list.First.Value.Timestamp < nowTime.Subtract(buffering))
                {
                    el = list.First.Value;
                    list.RemoveFirst();

                    Console.WriteLine($"{scheduler.Now} Removing el: {el.Timestamp}  {el.Value} {list.Count}");
                }

                if (el != default(System.Reactive.Timestamped<T>) && (list.Count <= 1 || list.First.Value.Timestamp > nowTime.Subtract(buffering)))
                {
                    list.AddFirst(el);
                    Console.WriteLine($"{scheduler.Now} Adding el: {el.Timestamp}  {el.Value} {list.Count}");
                    if (list.Count > 0)
                    {
                        Console.WriteLine($"{scheduler.Now} el: {el.Timestamp}  {el.Value} list.First: {list.First.Value.Timestamp} {list.First.Value.Value} list.Last: {list.Last.Value.Timestamp} {list.Last.Value.Value}");
                    }
                }

                o.OnNext(list.Select(tx2 => tx2.Value).ToArray());

            }, o.OnError, o.OnCompleted);
        });
    }
}

它产生了我正在寻找的行为:

在此处输入图像描述

如果除了我最初提到 James World 在Advanceable 历史流和 Rx 中的实时流中的答案之外,我还没有承认 Enigmativity 和他在响应式扩展滑动时间窗口中的答案,那将是一个很大的遗漏。如果没有他们的帮助,不仅在这些特定的帖子中,而且在许多其他帖子中,我都没有机会在我的代码中使用 Rx,所以非常感谢他们两个。

于 2017-03-20T03:19:34.417 回答