我正在使用 Flink 1.2 CEP 来检测设备丢失的心跳事件。我从 RabbitMQ 源读取心跳事件,并使用以下模式通过超时功能检测由序列号键入的设备丢失的心跳。
此模式流适用于从设备发送至少单个心跳的情况。但是我还需要处理检测设备丢失心跳的用例,该设备在应用程序启动后甚至没有启动一次心跳。
为此,我需要使用所有设备初始化心跳事件来初始化输入心跳流。如果我初始化流,这将处理没有收到第一次心跳的设备也会超时并发出警报。
即使在从 RMQSource 函数侦听之前,如何使用所有设备的 init heartbeat 数据初始化数据流?
//Reading heart beat of device from RabbitMQ queue
DataStream<HeartBeatEvent> heartBeatStream=
env.addSource(rmqSource).assignTimestampsAndWatermarks(new
IngestionTimeExtractor<String>());
//Pattern to detect missing heartbeat
Pattern<HeartBeanEvent, ?> heartBeatEventPattern = Pattern.<
HeartBeanEvent >begin("first")
.subtype(HeartBeanEvent.class)
.next("second")
.subtype(HeartBeanEvent.class)
.within(Time.seconds(360));
DataStream<Either< HeartBeanEvent, String>> result =
CEP.pattern(heartBeatStream.keyBy(serialNum),
heartBeatEventPattern).
select(new PatternTimeoutFunction< HeartBeanEvent, HeartBeanEvent >() {
public HeartBeanEvent timeout(Map<String, HeartBeanEvent >
pattern, long timeoutTimestamp) throws Exception {
System.out.println("Missing heart beat:" +
pattern.get("first").getSerialNum() + ":" +
pattern.get("first").getEventTime());
return pattern.get("first");
}
},new PatternSelectFunction< HeartBeanEvent, String>() {
public String select(Map<String, HeartBeanEvent > pattern) {
return null;
}
});