4

在前几天,我一直在尝试编写一个 Rx 查询来处理来自源的事件流并检查是否缺少某些 ID。缺席被定义为存在一系列时间窗口(例如,从 9:00 到 17:00 的所有时间),在这些时间窗口期间,最多应该有 20 分钟没有 ID 出现在流中。更复杂的是,应根据 ID 定义缺勤时间。例如,假设三种事件 A、B 和 C 出现在事件组合流(A、A、B、C、A、C、B 等)中,可以定义为

  • 每天从 9:00 到 10:00 监控事件,最长无事件时间为 10 分钟。
  • 每天 9:00 至 11:00 监控 B 事件,最长无事件时间为 5 分钟。
  • 每天 12:00 至 15:00 监控 C 事件,最长无事件时间为 30 分钟。

我认为我需要首先将流划分为GroupBy分隔事件,然后使用缺席规则处理生成的单独流。我已经在Microsoft Rx 论坛上仔细考虑了这一点(非常感谢 Dave),并且我有一些工作代码来生成规则并进行缺勤检查,但是我很努力,例如,如何将它与分组结合起来。

所以,没有进一步的演讲,到目前为止被黑的代码:

//Some sample data bits representing the events.
public class FakeData
{
    public int Id { get; set; }

    public string SomeData { get; set; }
}

//Note the Now part in DateTime to zero the clock time and have only the date. The  purpose is to create start-end pairs of times, e.g. 9:00-17:00.
//The alarm start and end time points should match themselves pairwise, could be pairs of values...
var maxDate = DateTime.Now.Date.AddHours(17).AddMinutes(0).AddSeconds(0).AddDays(14);
var startDate = DateTime.Now.Date.AddHours(9).AddMinutes(0).AddSeconds(0);
var alarmStartPeriods =   Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d))).ToList();
var alarmEndPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d)).AddHours(5)).ToList();

还有一个查询可以在不分组的情况下进行缺勤检查,这是我的症结之一。<edit:也许我应该将时间点成对分组并添加一个 ID 并在查询中使用生成的三元组... </edit>

dataSource = from n in Observable.Interval(TimeSpan.FromMilliseconds(100))
             select new FakeData
             {
                 Id = new Random().Next(1, 5),
                 SomeData = DateTimeOffset.Now.ToString()
             };

var startPointOfTimeChanges = alarmStartPeriods.ToObservable();
var endPointOfTimeChanges = alarmEndPeriods.ToObservable();
var durations = startPointOfTimeChanges.CombineLatest(endPointOfTimeChanges, (start, end) => new { start, end });
var maximumInactivityTimeBeforeAlarmSignal =  TimeSpan.FromMilliseconds(250);

timer = (from duration in durations
         select (from _ in Observable.Timer(DateTime.Now)
                 from x in dataSource.Throttle(maximumInactivityTimeBeforeAlarmSignal).TakeUntil(duration.end)
                 select x)).Switch();

timer.Subscribe(x => Debug.WriteLine(x.SomeData));

问题:

  • 我应该如何尝试按 ID 对传入数据进行分组,并且仍然能够定义没有事件?
  • 我注意到的一件事是,如果警报周期的起点在过去(例如,查询在 10:00 开始,当规则说在 9:00 开始监控时),查询将不会开始。我想应该把开始时间推到现在时间。有一些标准的方法可以做到这一点,还是我应该只引入一个条件?

我能想到的其他问题会很好(自娱自乐:)):

  • 如何随时了解每个 ID 发生的最新事件?
  • 如何动态更改变量(正如 Dave 在 MS 论坛中已经提到的那样)?
  • 然后,最后,批处理事件并存储在某个地方(例如数据库),就像PeteGoo 博客中这个奇妙的例子一样?

我能想到的其他选项是明确使用

System.Threading.Timers
ConcurrentDictionary
,但需要不断学习!

关于詹姆斯的输入答案,这里有一个快速解释它是如何工作的以及我打算如何使用它。

首先,在第一个事件到来之前,observable 不会做任何事情。因此,如果应该立即开始监视,则需要添加一些其他 Rx 功能或触发一个虚拟事件。没问题,我相信。

其次,将从alarmInterval 中为任何新ID 获取一个新的超时变量。在这里,甚至是一个已经离开太久并触发了警报的新人。

我认为这很有效,因为人们可以订阅这个 observable 并做一些有副作用的事情。一些例子就像设置一个标志,发送一个信号以及什么业务规则有一个。此外,保持适当的锁定等,应该很容易根据预定义的警报规则提供新的时间跨度,并具有分离的缺勤期和时间窗口。

我将不得不研究与此相关的其他概念,以便更好地掌握事物。但我主要担心的是对此感到满意。生活很好,很好。:-)

4

1 回答 1

3

已编辑 - 改进了代码,简化了SelectMany使用TakeLast.

我写了一篇关于检测断开连接的客户端的博客文章 - 如果您将帖子中的 timeToHold 变量替换为下面的 alarmInterval 之类的函数以根据客户端 ID 获取节流时间跨度,那么这对于您的场景同样适用。

例如:

// idStream is an IObservable<int> of the input stream of IDs
// alarmInterval is a Func<int, TimeSpan> that gets the interval given the ID
var idAlarmStream = idStream
    .GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key)))
    .SelectMany(grp => grp.TakeLast(1));

这为您提供了持续监控的基本功能,而无需查看活动监控时段。

为了获得监控窗口功能,我会扭转局面并使用 WHERE 过滤上述输出,以检查发出的 ID 是否落在其监控时间窗口内。这使得更容易处理不断变化的监控周期。

您可以通过将每个监视窗口变成一个流并将它们与警报流组合来做一些更有趣的事情,但我不相信额外复杂性的好处。

alarmInterval 函数还将为您提供动态警报间隔元素,因为它可以返回新值,但这些只有在该 ID 的警报响起从而结束其当前组后才会生效。

---在这里开始一些理论---

要获得这种完全动态,您将不得不以某种方式结束该组 - 您可以通过几种方式做到这一点。

一种是使用 Select 将 idStream 投影到包含 ID 和全局计数器值的自定义类型的流中。给这个类型一个适当的相等实现,这样它就可以正确地与 GroupByUntil 一起工作。

现在每次更改警报间隔时,请更改计数器。这将导致为每个 ID 创建新组。然后,您可以在最终过滤器中添加额外的检查,以确保输出事件具有最新的计数器值。

于 2013-10-11T09:40:53.790 回答