我有一个 siddhi 查询,通过使用时间批处理窗口来获取一分钟内的事件总数。使用输出流,我正在更新一个条形图,其中包含恒定的未来值、x 轴上的时间戳(日期和时间,直到分钟)和 y 轴上的事件计数。
但是有时一分钟内的事件数量需要太长时间才能传输,因此查询不会给出正确的结果。
例如,如果我总共获得 60 个事件,并且此查询首先给我 40 的计数,它显示在条形图中,但一分钟后它的值更改为 20,根据逻辑这是正确的,但我担心是否有办法我可以为任何以前的时间戳(在这种情况下为 40+20)更新流和条形图,并将下一个即将到来的时间戳的新值插入其中。
我已经看到更新功能用于表而不是流,是这样吗?而且我还想要 2 个输出流从同一个输入流中填充两个不同的条形图。那么下面的查询是否正确?
查询是:
/* Enter a unique ExecutionPlan */
@Plan:name('FTPExecutionPlan')
/* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan')
/* define streams/tables and write queries here ... */
@Import('FTPInStream:1.0.0')
define stream FTPInStream (ts string, uid string, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, user string, password string,command string, arg string, mime_type string, file_size string, reply_code int, reply_msg string);
@Export('FTPIPOutStream:1.0.0')
define stream FTPIPOutStream (ip_address string, ftp_requests int);
@Export('FTPOutStream:1.0.0')
define stream FTPOutStream (ts string, ftp_requests int);
from FTPInStream
select time:dateFormat(str:replaceAll(ts,'T',' '),'yyyy-MM-dd HH:mm', 'yyyy-MM-dd HH:mm:ss') as ts, uid, id_orig_h, id_orig_p, id_resp_h, id_resp_p
insert into intermediateStream;
from intermediateStream#window.timeBatch(1 min)
select ts, cast(count(ts), 'int') as ftp_requests
group by ts
insert into FTPOutStream;
from intermediateStream#window.timeBatch(1 min)
select id_orig_h as ip_address, cast(count(id_orig_h), 'int') as ftp_requests
group by id_orig_h
insert into FTPIPOutStream;