0

我有一个 siddhi 查询,通过使用时间批处理窗口来获取一分钟内的事件总数。使用输出流,我正在更新一个条形图,其中包含恒定的未来值、x 轴上的时间戳(日期和时间,直到分钟)和 y 轴上的事件计数。

但是有时一分钟内的事件数量需要太长时间才能传输,因此查询不会给出正确的结果。

例如,如果我总共获得 60 个事件,并且此查询首先给我 40 的计数,它显示在条形图中,但一分钟后它的值更改为 20,根据逻辑这是正确的,但我担心是否有办法我可以为任何以前的时间戳(在这种情况下为 40+20)更新流和条形图,并将下一个即将到来的时间戳的新值插入其中。

我已经看到更新功能用于表而不是流,是这样吗?而且我还想要 2 个输出流从同一个输入流中填充两个不同的条形图。那么下面的查询是否正确?

查询是:

/* Enter a unique ExecutionPlan */
@Plan:name('FTPExecutionPlan')

/* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan')

/* define streams/tables and write queries here ... */

@Import('FTPInStream:1.0.0')
define stream FTPInStream (ts string, uid string, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, user string, password string,command string, arg string, mime_type string, file_size string, reply_code int, reply_msg string);

@Export('FTPIPOutStream:1.0.0')
define stream FTPIPOutStream (ip_address string, ftp_requests int);

@Export('FTPOutStream:1.0.0')
define stream FTPOutStream (ts string, ftp_requests int);

from FTPInStream
select time:dateFormat(str:replaceAll(ts,'T',' '),'yyyy-MM-dd HH:mm', 'yyyy-MM-dd HH:mm:ss') as ts, uid, id_orig_h, id_orig_p, id_resp_h, id_resp_p
insert into intermediateStream;

from intermediateStream#window.timeBatch(1 min)
select ts, cast(count(ts), 'int') as ftp_requests
group by ts
insert into FTPOutStream;

from intermediateStream#window.timeBatch(1 min)
select id_orig_h as ip_address, cast(count(id_orig_h), 'int') as ftp_requests
group by id_orig_h
insert into FTPIPOutStream;
4

1 回答 1

0

但是有时一分钟内的事件数量需要太长时间才能传输,因此查询不会给出正确的结果。

在上述情况下,您可能应该使用externalTimeBatch窗口,而不是 timeBatch 窗口。然后它将处理相对于事件时间戳而不是实际时间的事件批处理。

我已经看到更新功能用于表而不是流,是这样吗?

是的。只有事件表具有删除/更新功能。事件流没有该功能。

而且我还想要 2 个输出流从同一个输入流中填充两个不同的条形图。那么下面的查询是否正确?

是的。由于您同时导出 FTPOutStream 和 FTPIPOutStream,您可以使用它们来填充两个不同的条形图。

于 2016-08-07T05:17:47.290 回答