我开始使用 StreamInsight 并将其用作 wcf 服务。我已经尝试在“ A Hitchhiker's Guide to Microsoft StreamInsight Queries ”中寻求帮助,并尝试了这些示例以及 codeplex 中的示例。
我的问题是这样的:
我的事件生产者向适配器提供 AlertEvent 的:
public sealed class AlertEvent
{
public DateTime Date { get; set; }
public long IDGroup { get; set; }
public bool IsToNormalize { get; set; }
public bool IsError { get; set; }
}
当 AlertEvent 的 IsError = false 时,标志 IsToNormalize 为 true;
我试图实现的行为是,当我收到带有 IsError 的流时,我想看看在接下来的“x”分钟内,是否有任何带有 IsToNormalize 的 alertEvent 到达。然后我发送以输出开始搜索的 IsError AlarmEvent。
我所做的是,当我收到与过滤器相对应的输入时,我会在“x”分钟内延长其生命周期并创建一个 TumblingWindow 以查看在此期间是否有另一个 AlertEvent 与另一个标志一起到达(使用 ExtensionMethod 进行迭代通过窗口中的所有有效负载)。
var stream= from item in input
where item.IsError
group item by item.IdGroup into eachGroup
from window in eachDigital.AlterEventDuration(e => TimeSpan.FromMinutes((double)1.5)).TumblingWindow(TimeSpan.FromSeconds(15), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new
{
Id = eachDigital.Key,
NormEvents = window.HasNormalizationEvents(),
Count = window.Count()
};
然后,为了获得触发 TumblingWindow 的 AlarmEvent,我与原始输入进行了连接。
var resultStream = from e1 in stream
join e2 in input
on e1.Id equals e2.DigitalTag
where e1.NormEvents != 0
select e2;
这根本不起作用......:/有什么想法可以帮助解决这个问题吗?
我的另一个疑问是,是否会为每个通过过滤器的输入创建一个带有新 startDate 的新窗口,或者是否只创建一个 tumblingWindow。
谢谢。