1

(我从 Esper 跳到 Siddhi)

我正在尝试在同一流中部署多个查询。此处的示例https://docs.wso2.com/display/CEP400/Using+Siddhi+as+a+Library展示了如何在一个流中部署查询,完全在一个 ExecutionPlan 中。那么,如果我想将查询添加到同一个执行计划或同一个流中呢?

我有兴趣在语法上这样做,Java 代码。

更新

我想在现有执行计划中添加查询。这意味着当流到达时,而不是之前。使用作为答案发布的示例:

SiddhiManager siddhiManager = new SiddhiManager();

String executionPlan = "" +
                       "@Plan:name('demo') " +
                       "" +
                       "define stream cseEventStream (symbol string, price float, volume long);" +
                       "" +
                       "from cseEventStream[symbol==\"WSO2\"] " +
                       "insert into wso2Stream;" +
                       "" +
                       "from cseEventStream[symbol==\"ABC\"] " +
                       "insert into abcStream;";

ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);

StreamCallback streamCallback = new StreamCallback() {
    @Override
    public void receive(Event[] events) {
        EventPrinter.print(events);
    }
};

executionPlanRuntime.addCallback("wso2Stream", streamCallback);
//Similarly, we can add another call back for abcStream

streamCallback.startProcessing();

InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream");

executionPlanRuntime.start();
// HERE: ADD new query <---------------------------------------------
inputHandler.send(new Object[]{"WSO2", 700f, 100l});
inputHandler.send(new Object[]{"ABC", 60.5f, 200l});
inputHandler.send(new Object[]{"WSO2", 60.5f, 200l});

streamCallback.stopProcessing();
executionPlanRuntime.shutdown();  
4

1 回答 1

1

cseEventStream以下示例在同一执行计划中为同一 Stream ( )添加多个查询。

SiddhiManager siddhiManager = new SiddhiManager();

String executionPlan = "" +
                       "@Plan:name('demo') " +
                       "" +
                       "define stream cseEventStream (symbol string, price float, volume long);" +
                       "" +
                       "from cseEventStream[symbol==\"WSO2\"] " +
                       "insert into wso2Stream;" +
                       "" +
                       "from cseEventStream[symbol==\"ABC\"] " +
                       "insert into abcStream;";

ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);

StreamCallback streamCallback = new StreamCallback() {
    @Override
    public void receive(Event[] events) {
        EventPrinter.print(events);
    }
};

executionPlanRuntime.addCallback("wso2Stream", streamCallback);
//Similarly, we can add another call back for abcStream

streamCallback.startProcessing();

InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream");

executionPlanRuntime.start();

inputHandler.send(new Object[]{"WSO2", 700f, 100l});
inputHandler.send(new Object[]{"ABC", 60.5f, 200l});
inputHandler.send(new Object[]{"WSO2", 60.5f, 200l});

streamCallback.stopProcessing();
executionPlanRuntime.shutdown();

我使用了 Siddhi 版本 3.0.6-beta2。

此代码示例是对其中一个Siddhi Passthrough 测试用例( PassThroughTest4) 的修改。

更新:

查看ExecutionPlanRuntime 类,似乎不可能“在运行中”添加查询。

因此,据我了解,您将不得不关闭当前的执行计划运行时,添加这些新查询并重新启动它。

于 2016-06-09T06:53:38.267 回答