0

我正在收集具有不同 ID 的事件,传入事件中有 n 种固定 ID。我想根据时间范围或不收集过去事件的平均值。不同类型的 id 之间的事件。假设有 2 个设备发送数据/事件,ID 为“a”和“b”。我想获得两个设备过去 5 分钟数据的平均值,然后比较两个平均值以做出一些决定。

通过这段代码,我正在收集过去 n 分钟的数据并存储在 2 个窗口中。`

@source(type='http', receiver.url='http://localhost:5007/SweetProductionEP', @map(type = 'json'))
define stream InProduction(name string, amount int);


define window hold_a(avg_amount double) length(1);

define window hold_b(avg_amount double) length(1);


from InProduction[name=='a']#window.timeBatch(5 min)
select avg(amount) as avg_amount
group by name
insert into hold_a;

from InProduction[name=='b']#window.timeBatch(5 min)
select avg(amount) as avg_amount
group by name
insert into hold_b;`

窗口hold_a 和hold_b 将获得过去5 分钟的平均数据。现在我想比较两个窗口的数据并做出决定。

我已经尝试在两个窗口上加入,但加入查询没有被执行。

4

1 回答 1

1

你必须使用一种模式来实现这一点。下面的查询将具有最高平均值的名称输出到highestAvgStream。

@source(type='http', receiver.url='http://localhost:5007/SweetProductionEP', @map(type = 'json'))
define stream InProduction(name string, amount int);

from InProduction[name=='a']#window.timeBatch(5 min)
select avg(amount) as avg_amount, name
insert into avgStream;

from InProduction[name=='b']#window.timeBatch(5 min)
select avg(amount) as avg_amount, name
insert into avgStream;`

from every(e1=avgStream -> e2=avgStream)
select ifthenelse(e1.avg_amount>e2.avg_amount,e1.name,e2.name) as highestAvgName
insert into HighestAvgStream;
于 2019-01-05T07:18:43.123 回答