3

我的代码如下:

StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyObject> input = env.addSource(new MyCustomSource());

Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");

PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);

...定义我的模式

DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());

resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));

try
  {
    env.execute();
  }
  catch (Exception exception)
  {
    log.debug("Error while ", exception);
  }

这段代码可以工作并且可以做我想做的事,我得到一个遵循我设置的模式的结果流。

我想知道的是是否可以将新模式应用于我稍后添加到环境中的源,从而获得与不同模式匹配的不同结果流,而无需再次调用 env.execute() 因为当我这样做时除了我的新结果流之外,我还得到了多余的旧结果流(即旧模式被多次执行)?

4

1 回答 1

1

目前 Flink 的 CEP 库不支持开箱即用的动态模式更改。因此,一旦你定义了你的模式并开始你的工作,它只会处理这个定义的模式。

但是,您可以编写自己的运算符来实现TwoInputStreamOperator接口,该接口在一个输入模式定义和另一个输入上接收流记录(类似于 CoFlatMap 函数)。然后,对于每个新模式,您都必须NFA在运算符上编译一个新模式,并将任何新的传入流元素也提供给它NFA。这样,您就可以实现您的预​​期行为。

未来,我们很可能会将此功能添加到 Flink 的 CEP 库中。

于 2016-12-05T09:36:07.487 回答