0

这是我的代码。

    SplitStream<MonitoringEvent> splitStream =  inputStream.split(new OutputSelector<MonitoringEvent>() {

    @Override
    public Iterable<String> select(MonitoringEvent me) {

        List<String> ml = new ArrayList<String>();              
        ml.add(me.getEventType());                              
        return ml;
}

我有随机顺序的监控事件流温度:80,压力:70,湿度:80,温度:30 ...

使用上面的代码,我正在拆分流、事件类型,即温度流、压力流。

问题是,如果我知道 eventType,我可以从 splitStream 中选择它

splitStream.select('temperatureStream')

但 eventType 是动态的,不是预定义的。

我将如何为这个动态流应用 CEP。CEP 就像,如果

temperate is > 90 for past 10 minutes ...

pressure is > 90 for past 10 minutes ...
4

1 回答 1

0

如果我错了,请纠正我,但我认为不可能对 select due flink 的并行性进行动态查找。你的程序被翻译成 flinks taskmanagers 的并行指令,jobmanager 协调这些动作。如果没有对您的抽象语法树的全面了解,根本就无法应用并行性......也许您可以找到所有消息共享和不同的一些共同属性

于 2017-03-07T15:08:55.603 回答