我们试图实现每小时事件计数的时间序列。我们想在 CEP 中进行每小时计数并将输出存储在数据存储/nosql 中。缺少的是我们想要存储的是某一天每小时的计数。
为此,我们需要在每次时间批处理窗口到期时从 CEP 输出当前时间戳。
有人可以解释如何使用 WSO2 CEP 实现这一目标吗?
谢谢 反相
我认为您应该使用 BAM 而不是 CEP,因为您正在尝试做的事情看起来更像是一个 map-reduce 工作。
你给它看了吗?
希望您使用的是 WSO2 CEP 3.1.0。目前 WSO2 CEP 4.0.0 正在开发中,一旦 WSO2 CEP 发布,将会有一个 RDBMS 发布者(输出适配器),您可以在其中指定连接并直接发布输出流值。
您可以有一个带有 siddhi 查询的执行计划来实现时间戳逻辑。要了解有关 siddhi 查询语言的更多信息,请参阅 WSO2 Siddhi 文档。在此处输入链接描述
以下是带有 siddhi 查询的示例执行计划,用于检查给定时间窗口(1 分钟)内的室温值,并将平均温度与房间号一起写入输出流。如果您想将它们存储在数据库中,您可以为输出流使用 RDBMS 发布者(输出适配器)。
/* Enter a unique ExecutionPlan */
@Plan:name('testPlan')
/* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan')
/* define streams and write query here ... */
@Import('inStream:1.0.0')
define stream inStream (temperature double, roomNumber int);
@Export('outStream:1.0.0')
define stream outStream (temperature double, roomNumber int);
from inStream#window.time(1 min)
select avg(temperature) as temperature,roomNumber
group by roomNumber
having temperature>= 70
insert into outStream;
您可以time:currentTime()
在选择中使用扩展来获取 Siddhi 3.0/WSO2CEP 4.0 的时间窗口的到期时间。有关示例,请查看此测试用例。