查询如下(这些是草稿查询,可能需要稍作修改才能运行)
- 要在 5 分钟内检测传感器 1,然后检测传感器 2(假设 sensorSram 具有 id、值),您可以简单地使用如下模式和 'within' 关键字:
from e1=sensorStream[sensorId == '1'] -> e2=sensorStream[sensorId == '2']
select 'composite activity detected' as description, e1.value as sensor1Value, e2.value as sensor2Value
within 5 minutes
insert into compositeActivityStream;
- 要检测未发生(id=1 到达,但 5 分钟内没有 id=2),我们可以有以下两个查询:
from sensorStream[sensorId == '1']#window.time(5 minutes)
select *
insert into delayedSensor1Stream for expired-events;
from e1=sensorStream[sensorId == '1'] -> nonOccurringEvent = sensorStream[sensorId == '2'] or delayedEvent=delayedSensor1Stream
select 'id=2 not found' as description, e1.value as id1Value, nonOccurringEvent.sensorId as nonOccurringId
having (not(nonOccurringId instanceof string))
insert into nonOccurrenceStream;
这将在 id=1 事件到达 5 分钟后立即检测到未发生的事件。有关上述逻辑的解释,请查看cep 4.0.0 的非出现示例(语法有点不同,但想法相同)
- 现在,由于您需要定期生成报告,我们需要另一个查询。为方便起见,我假设您需要每 6 小时(360 分钟)报告一次,并在此处使用时间批处理窗口。或者,使用新的 CEP 4.0.0,您可以使用“Cron 窗口”在特定时间生成它,这更适合您的用例。
from nonOccurrenceStream#window.timeBatch(360 minutes)
select count(id1Value) as nonOccurrenceCount
insert into nonOccurrenceReportsStream for expired-events;
对于此用例,您可以使用 http 输入/输出适配器并使用 json 构建器和格式化程序进行 json 映射。