全面披露:我还在这里发布了这个问题的一个变体。
我有一个嵌入式设备作为加热系统的一部分,它每 5 秒通过一个 mosquitto MQTT 代理发布两个温度值,每个温度值到一个单独的 MQTT 主题。“mydevice/sensor1”是预热温度,“mydevice/sensor2”是后加热温度。这些值几乎在同一时间发布,因此两条消息之间的延迟通常不会超过半秒 - 但它们并不完全同步。
Telegraf 订阅了同一个代理,并且很乐意将这些测量结果放入一个名为“telegraf.autogen”的 InfluxDB 数据库中。测量都出现在一个名为“mqtt_consumer”的测量下,其中包含一个名为“value”的字段。在 InfluxDB 中,我可以通过使用“主题”标签过滤来区分主题标签值:
SELECT mean("value") AS "mean_value" FROM "telegraf"."autogen"."mqtt_consumer" WHERE time > now() - 1m AND "topic"='mydevice/sensor1' GROUP BY time(5s)
这一切似乎都正常工作。
我想要做的是计算这两个主题值之间的差异,对于每对传入的值,以计算温差并最终计算加热系统传递的能量(流速是恒定的且已知的)。我尝试在 Grafana 中使用 InfluxDB 查询来做到这一点,但它似乎很困难(我失败了),所以我想我会尝试使用 TICKscript 将我的流程分解为小步骤。
我一直在整理一个 TICKscript 来计算基于此示例的差异:
https://docs.influxdata.com/kapacitor/v1.3/guides/join_backfill/#stream-method
但是在我的情况下,我没有两个单独的测量值。相反,我使用主题标签作为过滤器,从单个“mqtt_consumer”测量中创建两个单独的流。然后我尝试以 1s 的容差加入这些(值总是及时发布得足够近)。我httpOut
用来生成调试视图(除此之外:这仅每 10 秒更新一次,缺少每秒的值,即使我的流以 5 秒的间隔运行 - 为什么?我可以在新数据库中看到这些值是虽然都在场)。
一旦我将它们加入,我将评估值的差异,并将其存储在一个名为“diff”的测量下的新数据库中。
到目前为止,这是我的脚本:
var sensor1 = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mqtt_consumer')
.where(lambda: "topic" == 'mydevice/sensor1')
.groupBy(*)
|httpOut('sensor1')
var sensor2 = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mqtt_consumer')
.where(lambda: "topic" == 'mydevice/sensor2')
.groupBy(*)
|httpOut('sensor2')
sensor1
|join(sensor2)
.as('value1', 'value2')
.tolerance(1s)
|httpOut('join')
|eval(lambda: "sensor1.value1" - "sensor1.value2")
.as('diff')
|httpOut('diff')
|influxDBOut()
.create()
.database('mydb')
.retentionPolicy('myrp')
.measurement('diff')
不幸的是,我的脚本无法通过join
节点传递任何项目。在kapacitor show
我可以看到httpOut
节点都将项目传递给join
节点,但它没有传递任何东西。kapacitor 日志也没有显示任何明显的内容。httpOut('join')
用于返回的 HTTP GET :
{"series":null}
我有两个问题:
- 这种使用带有 TICKscript 的 Kapacitor 来根据单个测量中两个值之间的差异计算能量的方法有效吗?还是有更好/更简单的方法来做到这一点?
- 为什么
join
节点不产生任何输出?我能做些什么来进一步调试呢?