您可以执行以下操作:
define stream webvisit (idClient string, idProduct string, chanel string)
from visits[productId =='Fondos’]#window.time(4 days)
select idClient, idProduct, chanel, count(idClient) as visitCount
group by idClient
insert into visits;
from visits[visitCount > 2]
select *
insert into resultStream;
在第二个查询中,我们获取每个客户在过去 4 天内的访问计数,在最后一个查询中,我们过滤计数 > 2 的结果。
编辑:
由于只有在最后一天未发送通知时才需要发送通知(假设它定义为:当前时间 - 24 小时),您可以尝试以下操作:
define stream webvisit (idClient string, idProduct string, chanel string);
from webvisit[idProduct == 'Fondos']#window.time(4 days)
select idClient, idProduct, chanel, count(idClient) as visitCount
group by idClient insert into visits for current-events;
from visits[visitCount > 2]#window.time(1 day)
select idClient, idProduct, chanel, count(idClient) as hitsForClientPerDay
insert into tempStream;
from tempStream[hitsForClientPerDay < 2]
select idClient, idProduct, chanel, 'your custom message here' as advertisement
insert into advertisementStream;
第二个(1 天窗口)查询跟踪过去 24 小时内生成了多少警报('hitsForClientPerDay'),最后一个查询仅在该期间没有任何警报时才发送广告(注意 hitsForClientPerDay事件发生时将为 1,因为当前事件也被考虑用于 count(),因此我们将其检查为 < 2)。