我的代码如下:
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyObject> input = env.addSource(new MyCustomSource());
Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");
PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);
...定义我的模式
DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());
resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));
try
{
env.execute();
}
catch (Exception exception)
{
log.debug("Error while ", exception);
}
这段代码可以工作并且可以做我想做的事,我得到一个遵循我设置的模式的结果流。
我想知道的是是否可以将新模式应用于我稍后添加到环境中的源,从而获得与不同模式匹配的不同结果流,而无需再次调用 env.execute() 因为当我这样做时除了我的新结果流之外,我还得到了多余的旧结果流(即旧模式被多次执行)?