我想以连续的方式计算出现在网络流量中的协议的百分比,以便这些百分比不断更新新事件。将生成一个饼图并使用百分比进行更新。由于我需要新的和以前的数据进行计算,我决定使用内存表来将事件保存更长时间(比如一天)。
由于事件表仅在与事件流连接时才可用,因此我也选择了外连接来获取旧值。只对协议及其百分比感兴趣,我只需要两列,但我无法在外连接中应用聚合函数。到目前为止,我生成的查询是:
@Import('MAINInStream:1.0.0')
define stream MAINInStream (ts string, uid string, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, proto string, service string, duration double, orig_bytes long, resp_bytes long, conn_state string, local_orig bool, local_resp bool, missed_bytes long, history string, orig_pkts long, orig_ip_bytes long, resp_pkts long, resp_ip_bytes long, tunnel_parents string, sensorname string);
@Export('ProtocolStream:1.0.0')
define stream ProtocolStream (protocol string, count int);
define table mem_conn_table (timestamp long, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, proto string);
from MAINInStream
select time:timestampInMilliseconds(time:dateAdd(str:replaceAll(ts,'T',' '), 5, 'hour',"yyyy-MM-dd HH:mm:ss"),'yyyy-MM-dd HH:mm') as timestamp, id_orig_h, id_orig_p, id_resp_h, id_resp_p, proto
insert into intermediateStream;
from MAINInStream
select time:timestampInMilliseconds(time:dateAdd(str:replaceAll(ts,'T',' '), 5, 'hour',"yyyy-MM-dd HH:mm:ss"),'yyyy-MM-dd HH:mm') as timestamp, id_orig_h, id_orig_p, id_resp_h, id_resp_p, proto
group by id_resp_p
insert into mem_conn_table;
from intermediateStream#window.externalTimeBatch(timestamp,1min, timestamp, 1min) as i right outer join mem_conn_table[time:dateDiff(time:currentTimestamp(),cast(timestamp,"string"), "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm:ss") == 0] as mc
on i.timestamp == mc.timestamp
SELECT (ifThenElse(mc.id_resp_p == 21,'FTP', ifThenElse(mc.id_resp_p == 22,'SSH', ifThenElse(mc.id_resp_p == 25,'SMTP', ifThenElse(mc.id_resp_p == 445,'SMB','MYSQL'))))) as protocol , cast(count(mc.id_resp_p),'int') as count
insert into ProtocolStream;
我正在用一分钟的外部时间批处理窗口,然后获取协议及其计数,但它没有给我任何输出。
有什么建议么?