0

我正在尝试在 Splunk 中查找无序事件。这个例子类似于我们系统中发生的事情:

...
Time=15:40.09 Id=11 ScenarioId=7 Event=BlockChange Block=A-A
Time=15:40.11 Id=12 ScenarioId=7 Event=BlockChangeConfirmed Block=1-7
Time=15:41.00 Id=13 ScenarioId=2 Event=BlockChange Block=E-6
Time=15:41.01 Id=14 ScenarioId=7 Event=Restart
Time=15:41.05 Id=15 ScenarioId=2 Event=BlockChangeConfirmed Block=B-2
Time=15:41.14 Id=16 ScenarioId=3 Event=BlockChangeConfirmed Block=2-4
Time=15:41.15 Id=17 ScenarioId=2 Event=Restart
Time=15:41.16 Id=18 ScenarioId=3 Event=BlockChange Block=E-1
Time=15:43.24 Id=19 ScenarioId=8 Event=BlockChange Block=C-7
Time=15:43.27 Id=20 ScenarioId=8 Event=BlockChangeConfirmed Block=D-2
Time=15:43.35 Id=21 ScenarioId=8 Event=BlockChange Block=D-2
Time=15:43.40 Id=22 ScenarioId=8 Event=BlockChangeConfirmed Block=4-A
...

在特定场景发生 BlockChange 事件后,需要在分配新 Block 之前对其进行确认。虽然偶尔会在 BlockChange 事件之前收到 BlockChangeConfirmed 事件(参见 Ids 16 和 18)。我正在尝试编写一个查询来识别这些无序事件。

我的方法是只考虑 Block* 事件,按 ScenarioId 将它们分组为事务,从 BlockChangeConfirmed 开始,在 10 秒的时间跨度内以 BlockChange 结束。因此,从逻辑的角度来看,它会将上述数据转换为:

ScenarioId=3: Time=15:41.14 Id=16 Event=BlockChangeConfirmed Block=2-4
              Time=15:41.16 Id=18 Event=BlockChange Block=E-1
ScenarioId=8: Time=15:43.27 Id=20 Event=BlockChangeConfirmed Block=D-2
              Time=15:43.35 Id=21 Event=BlockChange Block=D-2

这正确识别了 ID 16 和 18,但也错误地识别了 20 和 21。所以我需要包含 Block 字段,因为它总是在相应的 BlockChange 和 BlockChangeConfirmed 事件之间变化。

不过,我无法在 Splunk 中表达这一点。根据我使用的查询,它要么返回每个BlockChangeConfirmed 事件,要么不识别 Block 字段何时更改。

这是我正在使用的基本查询:

index="foo" sourcetype="bar" Block AND (Event=BlockChange OR Event=BlockChangeConfirmed)
| streamstats latest(Block) AS last earliest(Block) AS first 
| transaction ScenarioId startswith="(Event=BlockChangeConfirmed)" endswith="(Event=BlockChange)" maxspan=10s

我尝试将条件添加和更改为:

| search first != last

| search eval(first != last)

| where first != last

endswith=eval(first != last)

没有成功。

我怎样才能得到我想要的结果?

4

1 回答 1

0

对于这个特定的示例,我通过使用 mvcount 搜索块更改数大于 1 的位置来解决它:

index="foo" sourcetype="bar" Block AND (Event=BlockChange OR Event=BlockChangeConfirmed)
| streamstats earliest(Block) AS first 
| transaction ScenarioId startswith="(Event=BlockChangeConfirmed)" endswith="(Event=BlockChange)" maxspan=10s
| eval numberOfBlocks=mvcount(first)
| search numberOfBlocks > 1
于 2018-04-03T15:02:59.743 回答