1

尝试创建 Kinesis Analytics 查询,以在流程中的某个步骤耗时过长(或终止且未继续)时发出警报。

随着多步骤过程一步一步地进行,我有大量包含状态更新的数据。我正在尝试编写一个查询,该查询可以识别下一步何时在特定时间内没有发生(也就是超时)。具体来说,我想知道一个 ProcessID 何时不会在 5 分钟内从“已启动”变为“正在运行”。

我知道如何在数据库中执行此操作,但是当时间尺度不断变化时,它会变得混乱。非常感谢您提供的任何帮助!

我的事件具有三个属性:
ProcessID - 整数
状态 - 字符串(“开始”、“运行”或“完成”)
HappenedOn - 日期时间(例如 2017-10-02 15:17:00)

我将如何在数据库中执行此操作(非 Kinesis)

在 SQL 中,我会使用 LEFT OUTER JOIN 将事件表连接到自身,但无法弄清楚如何在实时查询情况下执行此操作。

#This will show me the start events that don't have a corresponding 'running' event

SELECT * FROM events as F 
LEFT OUTER JOIN events as S on F.PROCESSID = S.PROCESSID AND S.STATUS = 'running'
WHERE  F.STATUS = 'start' AND S.STATUS IS NULL;

到目前为止 Kinesis 中的解决方案
此查询保存并运行,但没有给我我正在寻找的内容。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (E1PROCESSID integer, 
E1STATUS varchar(7), E1HAPPENED varchar(32), E2PROCESSID integer, 
E2STATUS varchar(7), E2HAPPENED varchar(32) );

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"

SELECT F.PROCESSID, F.STATUS, F.HAPPENED, S.PROCESSID, S.STATUS, S.HAPPENED
FROM "SOURCE_SQL_STREAM_001" OVER (RANGE INTERVAL '5' MINUTE PRECEDING) AS F 
LEFT OUTER JOIN "SOURCE_SQL_STREAM_001"  AS S
ON F.PROCESSID = S.PROCESSID AND S.STATUS = 'running'
WHERE F.STATUS = 'start' AND S.STATUS IS NULL;

即使我可以使上述查询正常工作,我也需要 Kinesis 仅在 HAPPENED 值 5 分钟后查找相应的事件(或缺少它们)(例如,需要在当前日期时间和 HAPPENED 之间进行 DATEDIFF)。任何有关如何添加此内容的建议将不胜感激。

另外,我觉得我需要使用 FOLLOWING 而不是 PRECEDING,但 SQL 解析器不会让我这样做(我知道为什么)。我也对将 OVER 窗口添加到...LEFT 的哪个流加入感到困惑?正确的?两个都?

提前谢谢了。

4

1 回答 1

0

您可以通过创建以下规则使用 Drools 执行此操作:

declare EventA
  @role( event )
end

declare EventB
  @role( event ) 
end

rule "Timeout EventA"
when
  $a : EventA()
  not(exists(EventB(this after[0,5m] $a)))
then
  insertLogical(new TimeoutA($a.id));
end

您可以使用此服务创作 Drools Kinesis Analytics

于 2017-10-05T20:34:46.690 回答