0

我正在尝试在 siddhi 中的输入流上实现一个基本窗口。这是窗口查询

executionPlan = "" +
                "define stream inputStream (height int); " +
                "" +
                "@info(name = 'query1') " +
                "from inputStream #window.length(5) " + 
                "select avg(height) as avgHt " + 
                "insert into outputStream ;";

这就是我向输入流提供数据的方式。

    Object[] obj1 = {10};
    Object[] obj2 = {5};
    for (int i=0;i<10;i++) {
        try {
            inputHandler.send(obj1);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    for (int i=0;i<20;i++) {
        try {
            inputHandler.send(obj2);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

假设查询应该在每次输入到inputHandler. 所以对于这个例子,初始输出应该是 10,然后它应该逐渐减少并变成 5。在我发送了所有 10 和 2 个 5 的时候,我应该得到一个平均为 (10+10+10+5) 的回调+5)/5= 8。但目前没有发生这种情况。对于这个实现,我得到两个平均分别为 10 和 5 的回调。为什么没有从 10 逐渐减少到 5?

这就是我添加回调的方式

executionPlanRuntime.addCallback("query1", new QueryCallback() {
        @Override
        public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
            // printing inEvents
            EventPrinter.print(inEvents);

    });

我在这里想念什么?

4

1 回答 1

1

由于您是突发发送事件,因此它是其中的批处理事件。但是,如果您在发送的事件之间添加 Thread.Sleep(100),那么它将按预期输出。

于 2015-12-14T15:45:16.433 回答