我观察到我的 StreamInsight 应用程序中的事件处理非常奇怪。有一个 InputAdapter 将 Stream 划分为 TumblingWindows。然后我有多个查询同时运行。他们应该使用来自同一个 Stream 的所有相同的 TumblingWindows。我用这段代码来定义窗口:
var atgs = new AdvanceTimeGenerationSettings(config.Input.EventCount,
TimeSpan.FromSeconds(config.Input.Delay), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);
var dstream = CepStream<Dataclass>.Create("Data Input Stream", typeof (InAdapterFactory),
config.Input, EventShape.Point, ats);
var unfilteredtumbling = dstream.TumblingWindow(TimeSpan.FromSeconds(processinginterval),HoppingWindowOutputPolicy.ClipToWindowEnd);
然后我从这个流中执行两个不同的查询。使用此代码:
var count = from row in unfilteredtumbling
select new
{
value= row.Count(),
qind = 10,
stat = "Calculated Count"
};
var count2 = from row in unfilteredtumbling
select new
{
value= row.Count()*2,
qind = 10,
stat = "Calculated Count2"
};
像这样将每个绑定到自己的 OutputAdapter:
Query querycount = count.ToQuery(myApplication, "Count Output Query", "Output Count",
typeof (OutputAdapterFactory), config.Output, EventShape.Point, StreamEventOrder.FullyOrdered);
Query querycount2 = count2.ToQuery(myApplication, "Count Output Query2", "Output Count2",
typeof (OutputAdapterFactory), config.Output, EventShape.Point, StreamEventOrder.FullyOrdered);
以下链接显示了我收到的输出。
https://dl.dropboxusercontent.com/u/15482726/outputissue.jpg
不幸的是,我收到的输出不是我所期望的。看起来每个查询都有自己的输入适配器。并且消息被分发到两个输入适配器。即使 dstream 只创建一次,但工厂被调用了两次。但为什么以及何时?这怎么可能?如果我只使用一个查询,一切都会完美。
我使用了这个链接的解释http://technet.microsoft.com/en-us/library/ff518536.aspx 认为它应该这样工作。
非常欢迎任何帮助。
最好的问候乔