1

全面披露:我还在这里发布了这个问题的一个变体。

我有一个嵌入式设备作为加热系统的一部分,它每 5 秒通过一个 mosquitto MQTT 代理发布两个温度值,每个温度值到一个单独的 MQTT 主题。“mydevice/sensor1”是预热温度,“mydevice/sensor2”是后加热温度。这些值几乎在同一时间发布,因此两条消息之间的延迟通常不会超过半秒 - 但它们并不完全同步。

Telegraf 订阅了同一个代理,并且很乐意将这些测量结果放入一个名为“telegraf.autogen”的 InfluxDB 数据库中。测量都出现在一个名为“mqtt_consumer”的测量下,其中包含一个名为“value”的字段。在 InfluxDB 中,我可以通过使用“主题”标签过滤来区分主题标签值:

SELECT mean("value") AS "mean_value" FROM "telegraf"."autogen"."mqtt_consumer" WHERE time > now() - 1m AND "topic"='mydevice/sensor1' GROUP BY time(5s)

这一切似乎都正常工作。

我想要做的是计算这两个主题值之间的差异,对于每对传入的值,以计算温差并最终计算加热系统传递的能量(流速是恒定的且已知的)。我尝试在 Grafana 中使用 InfluxDB 查询来做到这一点,但它似乎很困难(我失败了),所以我想我会尝试使用 TICKscript 将我的流程分解为小步骤。

我一直在整理一个 TICKscript 来计算基于此示例的差异:

https://docs.influxdata.com/kapacitor/v1.3/guides/join_backfill/#stream-method

但是在我的情况下,我没有两个单独的测量值。相反,我使用主题标签作为过滤器,从单个“mqtt_consumer”测量中创建两个单独的流。然后我尝试以 1s 的容差加入这些(值总是及时发布得足够近)。我httpOut用来生成调试视图(除此之外:这仅每 10 秒更新一次,缺少每秒的值,即使我的流以 5 秒的间隔运行 - 为什么?我可以在新数据库中看到这些值是虽然都在场)。

一旦我将它们加入,我将评估值的差异,并将其存储在一个名为“diff”的测量下的新数据库中。

到目前为止,这是我的脚本:

var sensor1 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor1')
        .groupBy(*)
    |httpOut('sensor1')

var sensor2 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor2')
        .groupBy(*)
    |httpOut('sensor2')

sensor1
    |join(sensor2)
        .as('value1', 'value2')
        .tolerance(1s)
    |httpOut('join')
    |eval(lambda: "sensor1.value1" - "sensor1.value2")
        .as('diff')
    |httpOut('diff')
    |influxDBOut()
        .create()
        .database('mydb')
        .retentionPolicy('myrp')
        .measurement('diff')

不幸的是,我的脚本无法通过join节点传递任何项目。在kapacitor show我可以看到httpOut节点都将项目传递给join节点,但它没有传递任何东西。kapacitor 日志也​​没有显示任何明显的内容。httpOut('join')用于返回的 HTTP GET :

{"series":null}

我有两个问题:

  1. 这种使用带有 TICKscript 的 Kapacitor 来根据单个测量中两个值之间的差异计算能量的方法有效吗?还是有更好/更简单的方法来做到这一点?
  2. 为什么join节点不产生任何输出?我能做些什么来进一步调试呢?
4

1 回答 1

0

尝试在两个传感器中添加 |mean 节点来计算场的平均值:

var sensor1 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor1')
        .groupBy(*)
    |mean('field1')
    |httpOut('sensor1')

加入后,您应该对流使用新分配的名称,也不要使用原始名称:

sensor1
    |join(sensor2)
        .as('value1', 'value2')
        .tolerance(1s)
    |httpOut('join')
    |eval(lambda: "value1.field1" - "value2.field2")
        .as('diff')
    |httpOut('diff')
    |influxDBOut()
        .create()
        .database('mydb')
        .retentionPolicy('myrp')
        .measurement('diff')

平均字段是根据我之前的评论计算的字段。试试看!

此外,为了进一步调试,请尝试将日志节点添加到您想要关注的位置。

希望这可以帮助!问候

于 2018-02-27T11:52:34.897 回答