2

Siddhi CEP 的新手。除了 WS02 CEP 上的常规文档之外,有人可以指出一个很好的教程。

这是我们的要求。指出有关编写此类查询的正确方法的一些线索。

  • 拥有单一的传感器设备通知流(物联网应用程序)。
  • 流输入是通过 REST-JSON 并且输出也将被格式化为 REST-JSON。(希望这在 WS02 CEP 3.1 上是可能的)

所需的执行计划种类: - 如果设备通知报告传感器 1 的使用情况,则在 5 分钟内监控设备通知是否也报告传感器 2 的使用情况。如果找到,则在 REST-JSON 上生成报告复合活动的输出流。
- 如果在上午、下午和晚上的时间段内未检测到此类复合活动,则在 REST-JSON 上生成警告事件流状态。(那么如何找到未及时发生的事件) - 如果在早上、下午和晚上的某些时间段内未找到此类复合活动,则在 REST-JSON 上报告 failure1-event-stream 状态。

这应该每天都在工作,那么之前处理的数据将如何在 WSO2 CEP 中被删除。

问候, 阿米特

4

1 回答 1

2

查询如下(这些是草稿查询,可能需要稍作修改才能运行)

  1. 要在 5 分钟内检测传感器 1,然后检测传感器 2(假设 sensorSram 具有 id、值),您可以简单地使用如下模式和 'within' 关键字:

from e1=sensorStream[sensorId == '1'] -> e2=sensorStream[sensorId == '2']

select 'composite activity detected' as description, e1.value as sensor1Value, e2.value as sensor2Value

within 5 minutes

insert into compositeActivityStream;

  1. 要检测未发生(id=1 到达,但 5 分钟内没有 id=2),我们可以有以下两个查询:

from sensorStream[sensorId == '1']#window.time(5 minutes)

select *

insert into delayedSensor1Stream for expired-events;


from e1=sensorStream[sensorId == '1'] -> nonOccurringEvent = sensorStream[sensorId == '2'] or delayedEvent=delayedSensor1Stream

select 'id=2 not found' as description, e1.value as id1Value, nonOccurringEvent.sensorId as nonOccurringId

having (not(nonOccurringId instanceof string))

insert into nonOccurrenceStream;


这将在 id=1 事件到达 5 分钟后立即检测到未发生的事件。有关上述逻辑的解释,请查看cep 4.0.0 的非出现示例(语法有点不同,但想法相同)

  1. 现在,由于您需要定期生成报告,我们需要另一个查询。为方便起见,我假设您需要每 6 小时(360 分钟)报告一次,并在此处使用时间批处理窗口。或者,使用新的 CEP 4.0.0,您可以使用“Cron 窗口”在特定时间生成它,这更适合您的用例。

from nonOccurrenceStream#window.timeBatch(360 minutes)

select count(id1Value) as nonOccurrenceCount

insert into nonOccurrenceReportsStream for expired-events;


对于此用例,您可以使用 http 输入/输出适配器并使用 json 构建器和格式化程序进行 json 映射。

于 2015-08-06T04:54:15.033 回答