0

我正在尝试通过从 InfluxDB 1、2、3 和 4 周前查询相同的时间间隔来使用 Kapacitor 批处理|查询生成基线,然后将其向前移动并像这样连接在一起:

var w1 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(1w).period(period).every(1m).align().groupBy(time(1m))
    |shift(1w)

var w2 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(2w).period(period).every(1m).align().groupBy(time(1m))
    |shift(2w)

var w3 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(3w).period(period).every(1m).align().groupBy(time(1m))
    |shift(3w)

var w4 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(4w).period(period).every(1m).align().groupBy(time(1m))
    |shift(4w)

var bj = w1
    |join(w2, w3, w3)
        .as('w1', 'w2', 'w3', 'w4')
        .fill('null')

var b = bj
    |eval(lambda: (""w1.mean"" + ""w2.mean"" + ""w3.mean"" + ""w4.mean"") / float(4.0))
        .as('avg')

我正在使用完全外连接,因为有些星期可能会丢失一个值,在这种情况下,我会将基线计算为 3 个当前值的平均值。

但是,lambda 似乎不支持 Mean() 或任何此类数学函数。它似乎也不支持检查空值。

有没有办法像这样计算基线?

此外,一旦计算出基线,如何将其缓存起来,以便可以根据基线检查传入的流数据?

任何帮助表示赞赏!谢谢

4

3 回答 3

0

我最终使用 C# .NET Core 创建了一个 UDF,它作为一个进程并排运行,查询 InfluxDB,进行数学运算并缓存结果。

于 2019-09-13T18:43:27.477 回答
0

首先,尝试在批处理变量上使用偏移量而不是移位。

偏移量将从 x 之前的分钟、小时、天...

shift 节点应该用于连接过程,例如:

previous
    |shift(1w)
    |join(current)
    ......

这里有一个例子: https ://github.com/influxdata/kapacitor/issues/746

关于以不同的时间加入 4 个不同的流,并且由于我之前的评论,我想它不会起作用......也许使用联合而不是加入节点可以工作,但不确定!

你总是可以有 3 个刻度,检查当前到过去一周,2 周等等......

于 2018-02-27T12:18:25.523 回答
0

“因为几个星期可能会丢失一个值” - join 将永远等待这个值,不会发出其他相应的批次并导致内存泄漏。

|barrier() 节点可能有助于解决泄漏,但您仍然会在尝试访问缺失的点属性时遇到 eval 错误。

您想将其拆分为多个脚本,例如计算所有 4 个周期的脚本,为每个脚本添加一些标签,例如

|default().tag('stream', 'w1')

并将它们发送到

|kapacitorLoopback()

第二个脚本监听你的环回流,|window() 所有到达点没有分组并计算 |mean("mean") 无论它实际得到多少个周期。

于 2019-04-26T10:12:43.413 回答