我试图让 Siddhi 在检测到航班进入地理围栏时触发事件,但无法确定正确的查询来执行此操作。
我有以下输入流定义:
define stream GeofenceMulticasterConsumerStream ( journeyId string, geofenceId string, withinGeofence bool, timestamp long )
每次我获得航班的位置更新时,我都会在此流中为系统中的每个地理围栏生成一个事件(大约有 10 个地理围栏,因此认为 Siddhi 能够处理 10 * 数量的位置更新事件)
这是我开始的查询:
define partition geofencePartition by GeofenceMulticasterConsumerStream.geofenceId;
from every a = GeofenceMulticasterConsumerStream[withinGeofence == false] ->
b = GeofenceMulticasterConsumerStream[a.journeyId == b.journeyId and b.withinGeofence == true]
within 300000
select b.journeyId, b.geofenceId, b.timestamp as timeEntered
insert into EnteredGeofenceStream
partition by geofencePartition
但是,这给了我重复的地理围栏条目事件,因为它评估每个“a”事件与每个匹配的“b”事件(如果我有 5 个不在地理围栏中的事件,然后是一个,我得到 5 个地理围栏条目事件)
所以我尝试添加一些重复事件检测来避免这种情况:
from every a = GeofenceMulticasterConsumerStream[withinGeofence == false] ->
b = GeofenceMulticasterConsumerStream[a.journeyId == b.journeyId and b.withinGeofence == true]
within 300000
select b.journeyId, b.geofenceId, b.timestamp as timeEntered, geofences:hashEntry(b.journeyId, b.geofenceId, b.timestamp) as entryHash
insert into DuplicateEnteredGeofenceStream
partition by geofencePartition
from DuplicateEnteredGeofenceStream#window.firstUnique(entryHash)
select journeyId, geofenceId, timeEntered
insert into EnteredGeofenceStream
geofences:hashEntry 是我创建的一个函数,它为入口事件生成一个唯一的哈希码。
但是,我并不热衷于这样做,因为您必须在 firstUnique 窗口中记录所有唯一哈希,并担心这会造成内存泄漏。似乎有点过头了,因为散列仅适用于该时间点,因此我只需要一个最多有效几秒钟的 firstUnique 窗口来检查重复项。
我认为我遇到的一个大问题是我有一个包含多个航班和多个地理围栏的流,因为我看到的所有示例都简单得多我想知道我是否正在尝试实现不可能的事情。
我非常感谢您对此提出的任何建议,因为我现在已经没有想法了!
提前致谢!