0

通过对传入事件进行关联,我们可以根据指定的时间范围对过去的数据进行一些计算。 正如这里提到的

within我正在做这样的事情,通过使用关键字接收基于某个指定时间范围的事件。

@info(name='dg-start-check-query') from every (e1=InputStream) -> e2=InputStream[e1.meter_id=='s1' and e2.meter_id=='s1' and convert(e2.current_avg, 'double')-convert(e1.current_avg, 'double')==convert(e2.current_avg, 'double') and convert(e1.current_avg, 'double')+convert(e2.current_avg, 'double')!=0 and convert(e1.current_avg, 'double')==0] within 1 min select time:dateFormat(e2.device_time, 'HH:mm:ss.SSS yyyy-MM-dd') as date_time insert into OutputStreamStarted;

如何让此代码仅接收过去的 2 或 3 个事件并对其进行计算,就像使用收集事件一样window?此代码将接收给定时间范围内的所有事件,并在多个事件满足其条件时多次执行。

4

1 回答 1

0

您可以使用定义的长度窗口来实现此要求。对于每个传入事件,执行与窗口的单向连接,然后再放入窗口。你的条件可以进入条件加入。示例查询如下。您将需要处理您的逻辑条件,因为这些条件需要改进。

define stream InputStream (meter_id string, current_avg string, device_time int); 
define window eventWindow (meter_id string, current_avg string, device_time int) length(2); 

@info(name = 'query1') from InputStream[meter_id=='s1']#window.length(1) as e1 unidirectional join eventWindow as e2 
on (convert(e2.current_avg, 'double')-convert(e1.current_avg, 'double')==convert(e2.current_avg, 'double')) 
select e1.device_time insert into OutputStreamStarted ; 

@info(name = 'query2') from InputStream[meter_id=='s1'] 
select meter_id, current_avg, device_time 
insert into eventWindow ;
于 2018-12-29T17:51:49.743 回答