0

我试图让 Siddhi 在检测到航班进入地理围栏时触发事件,但无法确定正确的查询来执行此操作。

我有以下输入流定义:

define stream GeofenceMulticasterConsumerStream ( journeyId string, geofenceId string, withinGeofence bool, timestamp long )

每次我获得航班的位置更新时,我都会在此流中为系统中的每个地理围栏生成一个事件(大约有 10 个地理围栏,因此认为 Siddhi 能够处理 10 * 数量的位置更新事件)

这是我开始的查询:

define partition geofencePartition by GeofenceMulticasterConsumerStream.geofenceId;
from every a = GeofenceMulticasterConsumerStream[withinGeofence == false] ->
b = GeofenceMulticasterConsumerStream[a.journeyId == b.journeyId and b.withinGeofence == true]
within 300000
select b.journeyId, b.geofenceId, b.timestamp as timeEntered
insert into EnteredGeofenceStream
partition by geofencePartition

但是,这给了我重复的地理围栏条目事件,因为它评估每个“a”事件与每个匹配的“b”事件(如果我有 5 个不在地理围栏中的事件,然后是一个,我得到 5 个地理围栏条目事件)

所以我尝试添加一些重复事件检测来避免这种情况:

from every a = GeofenceMulticasterConsumerStream[withinGeofence == false] ->
b = GeofenceMulticasterConsumerStream[a.journeyId == b.journeyId and b.withinGeofence == true]
within 300000
select b.journeyId, b.geofenceId, b.timestamp as timeEntered, geofences:hashEntry(b.journeyId, b.geofenceId, b.timestamp) as entryHash
insert into DuplicateEnteredGeofenceStream
partition by geofencePartition

from DuplicateEnteredGeofenceStream#window.firstUnique(entryHash)
select journeyId, geofenceId, timeEntered
insert into EnteredGeofenceStream

geofences:hashEntry 是我创建的一个函数,它为入口事件生成一个唯一的哈希码。

但是,我并不热衷于这样做,因为您必须在 firstUnique 窗口中记录所有唯一哈希,并担心这会造成内存泄漏。似乎有点过头了,因为散列仅适用于该时间点,因此我只需要一个最多有效几秒钟的 firstUnique 窗口来检查重复项。

我认为我遇到的一个大问题是我有一个包含多个航班和多个地理围栏的流,因为我看到的所有示例都简单得多我想知道我是否正在尝试实现不可能的事情。

我非常感谢您对此提出的任何建议,因为我现在已经没有想法了!

提前致谢!

4

1 回答 1

0

由于您试图仅检测转换 a -> b,因此您可以在此处使用“序列”而不是模式。

在 Siddhi 中,当您使用序列时,它会匹配连续事件,中间没有任何其他事件。相反,模式允许其他事件存在于两者之间。

同样在这种情况下,由于多个航班可能同时进入同一个地理围栏,您必须按航班 ID(或旅程 ID)而不是地理围栏 ID 进行分区(否则,航班 X 和 Y 会有模式,例如 X_outside -> Y_outside -> X_inside -> Y_inside 不会被检测到)。这样,将始终正确检测到特定航班从 outside_fence -> inside_fence 的转换。

因此,一旦您使用正确的分区和序列,您就可以检测到转换,并且无需在此处使用独特的窗口等。

有关更多详细信息,请参阅此处有关序列的文档。

于 2014-10-28T11:00:18.243 回答