3

如果能够根据事件的字段值匹配事件,这将超出当前从匹配单独条件的事件中创建模式的能力。例如,如https://flink.apache.org/news/2016/04/06/cep-monitoring.html中所述,我们可以这样做:

Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .next("Second Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .within(Time.seconds(10));

但是,使用以下功能创建 Pattern 会很棒:.where(second_evt->evt.getTemperature() == first_evt->evt.getTemperature()

4

1 回答 1

0

如果您想比较不同事件中字段的值,您可以在flatSelect方法中进行,只需使用非常简单的模式,无需任何where表达式:

  1. 创建一个模式:

    Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
        .subtype(TemperatureEvent.class)
        .next("Second Event")
        .subtype(TemperatureEvent.class)
        .within(Time.seconds(10));
    
  2. 将模式应用于数据流:

    PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
        inputEventStream.keyBy("rackID"),
        warningPattern);
    
  3. 检查值并通过flatSelect方法生成一个新的复杂事件:

    DataStream<TemperatureWarning> warnings = tempPatternStream.flatSelect(
        (Map<String, MonitoringEvent> pattern, Collector<TemperatureAlert> out) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("First Event");
            TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event");
    
            if (first.getTemperature() <= second.getTemperature()) {
                out.collect(new TemperatureWarning(
                    first.getRackID(), 
                    (first.getTemperature() + second.getTemperature()) / 2));
            }
        });
    
于 2017-02-07T18:21:13.600 回答