0

我开始使用 StreamInsight 并将其用作 wcf 服务。我已经尝试在“ A Hitchhiker's Guide to Microsoft StreamInsight Queries ”中寻求帮助,并尝试了这些示例以及 codeplex 中的示例。

我的问题是这样的:

我的事件生产者向适配器提供 AlertEvent 的:

public sealed class AlertEvent
{
    public DateTime Date { get; set; }
    public long IDGroup { get; set; }
    public bool IsToNormalize { get; set; }
    public bool IsError { get; set; }
}

当 AlertEvent 的 IsError = false 时,标志 IsToNormalize 为 true;

我试图实现的行为是,当我收到带有 IsError 的流时,我想看看在接下来的“x”分钟内,是否有任何带有 IsToNormalize 的 alertEvent 到达。然后我发送以输出开始搜索的 IsError AlarmEvent。

我所做的是,当我收到与过滤器相对应的输入时,我会在“x”分钟内延长其生命周期并创建一个 TumblingWindow 以查看在此期间是否有另一个 AlertEvent 与另一个标志一起到达(使用 ExtensionMethod 进行迭代通过窗口中的所有有效负载)。

var stream= from item in input
            where item.IsError
            group item by item.IdGroup into eachGroup
            from window in eachDigital.AlterEventDuration(e => TimeSpan.FromMinutes((double)1.5)).TumblingWindow(TimeSpan.FromSeconds(15), HoppingWindowOutputPolicy.ClipToWindowEnd)
            select new
            {
                Id = eachDigital.Key,
                NormEvents = window.HasNormalizationEvents(),
                Count = window.Count()
            };

然后,为了获得触发 TumblingWindow 的 AlarmEvent,我与原始输入进行了连接。

var resultStream = from e1 in stream
                   join e2 in input
                   on e1.Id equals e2.DigitalTag
                   where e1.NormEvents != 0
                   select e2;

这根本不起作用......:/有什么想法可以帮助解决这个问题吗?

我的另一个疑问是,是否会为每个通过过滤器的输入创建一个带有新 startDate 的新窗口,或者是否只创建一个 tumblingWindow。

谢谢。

4

1 回答 1

2

尝试这个:

// Move all error events
// to the point where the potential timeout would occur.
var timedOut = input
                   .Where(e => e.IsError == true)
                   .ShiftEventTime(e => e.StartTime + TimeSpan.FromMinutes(5));

// Extend all events IsToNormalize by the timeout.
var following = input
                   .Where(e => e. IsToNormalize == true)
                   .AlterEventDuration(e => TimeSpan.FromMinutes(5));

// Mask away the potential timeout events by the extended events.
// - If IsToNormalize did not occur within the timeout, then the shifted error event
// will remain unchanged by the left-anti-semi-join and represent
// the desired output.
var result = from t in timedOut
             where (from c in following
                    where t.IdGroup == c.IdGroup
                    select c).IsEmpty()
             select t; // or some projection on t
于 2011-05-27T17:44:19.420 回答