我正在尝试在 siddhi 中的输入流上实现一个基本窗口。这是窗口查询
executionPlan = "" +
"define stream inputStream (height int); " +
"" +
"@info(name = 'query1') " +
"from inputStream #window.length(5) " +
"select avg(height) as avgHt " +
"insert into outputStream ;";
这就是我向输入流提供数据的方式。
Object[] obj1 = {10};
Object[] obj2 = {5};
for (int i=0;i<10;i++) {
try {
inputHandler.send(obj1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int i=0;i<20;i++) {
try {
inputHandler.send(obj2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
假设查询应该在每次输入到inputHandler
. 所以对于这个例子,初始输出应该是 10,然后它应该逐渐减少并变成 5。在我发送了所有 10 和 2 个 5 的时候,我应该得到一个平均为 (10+10+10+5) 的回调+5)/5= 8。但目前没有发生这种情况。对于这个实现,我得到两个平均分别为 10 和 5 的回调。为什么没有从 10 逐渐减少到 5?
这就是我添加回调的方式
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
// printing inEvents
EventPrinter.print(inEvents);
});
我在这里想念什么?