0

如果两个事件流入 Flink,它们是否可以使用随后的第三个事件中的信息进行逻辑连接(使用 DataStream API 或 CEP)?例如,下例中的第三个事件能否用于根据其 right_id 和 left_id 链接前两个事件?

ID: AAAA
ID: BBBB
ID: ZZZZ, right_id: AAAA, left_id: BBBB 
4

1 回答 1

1

这是一个非常基本的 CEP 用例。代码看起来像这样......

// data stream creation 
DataStream<Event> myStream = ... 

// cep pattern definition
Pattern<Event, ?> myPattern = Pattern.<Event>begin("first_event")
                .followedBy("second_event")
                .followedBy("match_event");

// cep pattern stream: apply pattern to stream
PatternStream<Event> myPatternStream = CEP.pattern(myStream, myPattern);

// create new data stream from pattern matches
DataStream<CEPEvent> myCEPEvent = myPatternStream.flatSelect(
                (Map<String, Event> pattern, Collector<CEPEvent> out) -> {

                // load potential event sequence matches
                Event first_event = pattern.get("first_event");
                Event second_event = pattern.get("second_event");
                Event match_event = pattern.get("match_event");

                // test event sequences 
                if (match_event.right_id.equals(first_event.ID)
                    && match_event.left_id.equals(second_event.ID)
                ){out.collect(new CEPEvent("successful cep hit"));}
            }
        );
于 2016-12-15T19:18:42.060 回答