在前几天,我一直在尝试编写一个 Rx 查询来处理来自源的事件流并检查是否缺少某些 ID。缺席被定义为存在一系列时间窗口(例如,从 9:00 到 17:00 的所有时间),在这些时间窗口期间,最多应该有 20 分钟没有 ID 出现在流中。更复杂的是,应根据 ID 定义缺勤时间。例如,假设三种事件 A、B 和 C 出现在事件组合流(A、A、B、C、A、C、B 等)中,可以定义为
- 每天从 9:00 到 10:00 监控事件,最长无事件时间为 10 分钟。
- 每天 9:00 至 11:00 监控 B 事件,最长无事件时间为 5 分钟。
- 每天 12:00 至 15:00 监控 C 事件,最长无事件时间为 30 分钟。
我认为我需要首先将流划分为GroupBy分隔事件,然后使用缺席规则处理生成的单独流。我已经在Microsoft Rx 论坛上仔细考虑了这一点(非常感谢 Dave),并且我有一些工作代码来生成规则并进行缺勤检查,但是我很努力,例如,如何将它与分组结合起来。
所以,没有进一步的演讲,到目前为止被黑的代码:
//Some sample data bits representing the events.
public class FakeData
{
public int Id { get; set; }
public string SomeData { get; set; }
}
//Note the Now part in DateTime to zero the clock time and have only the date. The purpose is to create start-end pairs of times, e.g. 9:00-17:00.
//The alarm start and end time points should match themselves pairwise, could be pairs of values...
var maxDate = DateTime.Now.Date.AddHours(17).AddMinutes(0).AddSeconds(0).AddDays(14);
var startDate = DateTime.Now.Date.AddHours(9).AddMinutes(0).AddSeconds(0);
var alarmStartPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d))).ToList();
var alarmEndPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d)).AddHours(5)).ToList();
还有一个查询可以在不分组的情况下进行缺勤检查,这是我的症结之一。<edit:也许我应该将时间点成对分组并添加一个 ID 并在查询中使用生成的三元组... </edit>
dataSource = from n in Observable.Interval(TimeSpan.FromMilliseconds(100))
select new FakeData
{
Id = new Random().Next(1, 5),
SomeData = DateTimeOffset.Now.ToString()
};
var startPointOfTimeChanges = alarmStartPeriods.ToObservable();
var endPointOfTimeChanges = alarmEndPeriods.ToObservable();
var durations = startPointOfTimeChanges.CombineLatest(endPointOfTimeChanges, (start, end) => new { start, end });
var maximumInactivityTimeBeforeAlarmSignal = TimeSpan.FromMilliseconds(250);
timer = (from duration in durations
select (from _ in Observable.Timer(DateTime.Now)
from x in dataSource.Throttle(maximumInactivityTimeBeforeAlarmSignal).TakeUntil(duration.end)
select x)).Switch();
timer.Subscribe(x => Debug.WriteLine(x.SomeData));
问题:
- 我应该如何尝试按 ID 对传入数据进行分组,并且仍然能够定义没有事件?
- 我注意到的一件事是,如果警报周期的起点在过去(例如,查询在 10:00 开始,当规则说在 9:00 开始监控时),查询将不会开始。我想应该把开始时间推到现在时间。有一些标准的方法可以做到这一点,还是我应该只引入一个条件?
我能想到的其他问题会很好(自娱自乐:)):
- 如何随时了解每个 ID 发生的最新事件?
- 如何动态更改变量(正如 Dave 在 MS 论坛中已经提到的那样)?
- 然后,最后,批处理事件并存储在某个地方(例如数据库),就像PeteGoo 博客中这个奇妙的例子一样?
我能想到的其他选项是明确使用
System.Threading.Timers
和ConcurrentDictionary
,但需要不断学习!
关于詹姆斯的输入答案,这里有一个快速解释它是如何工作的以及我打算如何使用它。
首先,在第一个事件到来之前,observable 不会做任何事情。因此,如果应该立即开始监视,则需要添加一些其他 Rx 功能或触发一个虚拟事件。没问题,我相信。
其次,将从alarmInterval 中为任何新ID 获取一个新的超时变量。在这里,甚至是一个已经离开太久并触发了警报的新人。
我认为这很有效,因为人们可以订阅这个 observable 并做一些有副作用的事情。一些例子就像设置一个标志,发送一个信号以及什么业务规则有一个。此外,保持适当的锁定等,应该很容易根据预定义的警报规则提供新的时间跨度,并具有分离的缺勤期和时间窗口。
我将不得不研究与此相关的其他概念,以便更好地掌握事物。但我主要担心的是对此感到满意。生活很好,很好。:-)