0

因此,我在这里询问了如何Throttle在运行查询的中间更改时间跨度,詹姆斯随后回答说存在过载,实际上也提供了一个示例(一切都很好,我也从那里学到了一些技术)。

在前一个周末,我编写了一段代码,其中Throttle间隔将由传入流本身定义。作为一个实际的例子,流可以是一系列定义如下的结构

struct SomeEvent
{
    public int Id;
    public DateTimeOffset TimeStamp;
}

然后接受流将检查TimeStamp字段并根据它们计算缺勤间隔。稍微改变一下 James 的链接示例,可以像这样生成流

Func<SomeEvent, IObservable<long>> throttleFactory = e => Observable.Timer(TimeSpan.FromTicks(throttleDuration.Ticks - (DateTimeOffset.Now.Ticks - e.TimeStamp.Ticks)));

var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => new SomeEvent { Id = 0, TimeStamp = DateTimeOffset.Now.AddTicks(-1) }).Throttle(throttleFactory);
var subscription = sequence.Subscribe(e => Console.WriteLine(e.TimeStamp));

时移,几个滴答,仅用于说明目的

然后我在这里有一个更详细的例子,同样,詹姆斯帮助了很多。简而言之,这里的想法是每个 ID 可能有“一个警报灯塔”(类似于交通信号灯),颜色像黄色和红色,它们轮流点亮,由没有多长时间来定义事件。然后当一个事件到来时,所有的灯都被关闭,“缺席计时器”又从零开始。

我遇到的问题是我似乎无法更改这个特定的示例,以便它使用这个想法来产生Throttle价值。特别是我似乎无法grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))在 James 的代码中在线很好进行分组。也许我在调试和所有方面都筋疲力尽了,但如果有人能推动正确的方向,我肯定会很感激!

有什么大主意?好吧,事件可以在源头加上时间戳,但是传输可能会增加需要考虑的延迟。从与分布式计算相关的 F# 用户组讨论(并且我自己对集成问题有点熟悉)来看,事件在某处加盖时间戳,然后通过不同的排队系统中继的场景会产生两种情况:

  1. 技术超时:在定义的时间间隔内,某个端点没有观察到任何事件。
  2. 业务超时:可能有大量事件,例如来自排队系统的时间性、持续性事件突发(甚至重复),但它们的时间戳记为“很久以前”。

<编辑:布兰登就我在2.中给出的例子提出了一个有效的观点。应该如何真正解释“业务超时”的缺失?如果事件尚未到达,则产生的唯一有效超时事件是“技术”之一如果它们确实以突发形式到达,接收者是否对事件之间的时间差感兴趣并希望相应地引发颜色事件? 或者是否应该根据业务事件中的时间戳重置计时器,然后当突发到达时,取最后一个的时间戳(同样,可能比允许的超时时间长)。它变得复杂和混乱,最好放弃这个作为例子。

写完之后,我仍然有兴趣知道如何执行 join in grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))。如果这变得复杂,我也倾向于将 Brandon 的帖子标记为答案(我觉得它可能会得到,我觉得分组相当复杂)。

4

1 回答 1

1

听起来节流不再是您想要的。这是你想要做的吗?

var alarms = events
    .GroupBy(e => e.Id)
    .SelectMany(grp =>
    {
        // Determine light color based on delay between events

        // go black if event arrives that is not stale
        var black = grp
            .Where(ev => (Date.Now - ev.TimeStamp) < TimeSpan.FromSeconds(2))
            .Select(ev => "black");

        // go yellow if no events after 1 second
        var yellow = black
            .Select(b => Observable.Timer(TimeSpan.FromSeconds(1)))
            .SwitchLatest()
            .Select(t => "yellow");

        // go red if no events after 2 seconds
        var red = black
            .Select(b => Observable.Timer(TimeSpan.FromSeconds(2)))
            .SwitchLatest()
            .Select(t => "red");

        return Observable
            .Merge(black, yellow, red)
            .Select(color => new { Id = grp.Key, Color = color });
    });
于 2013-10-29T16:20:54.163 回答