我目前正在尝试为 Grafana 创建一个流式数据源插件。为此,我改编了官方说明中的代码,以便将来自 MQTT 主题的数据传递给 Grafana。这是带有定时输出的官方工作演示代码frame
:
Grafana 指南中的代码
query(options: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = options.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
const intervalId = setInterval(() => {
frame.add({ time: Date.now(), value: Math.random() });
subscriber.next({
data: [frame],
key: query.refId,
});
}, 100);
return () => {
clearInterval(intervalId);
};
});
});
return merge(...streams);
}
与我的 Mosquitto 经纪人的连接以及数据的提交和接收工作得很好。有问题的是将接收到的数据通过返回CircularDataFrame
.
正如这个答案中所建议的,我使用 rxjsbindCallback
函数来观察消息事件处理程序。这样做时,消息回调函数中的控制台输出会为每条传入消息正确打印。而clientOnObs
实例订阅中的控制台输出仅在第一条消息上触发,之后不再触发。但是,应该是订阅中的控制台输出也会在每条传入消息上触发,这样我就可以展开CircularDataFrame
然后将其通过 传递给 Grafana Dashboard subscriber.next()
。bindCallback()
我已经尝试交换and的顺序this.mqttClient.on()
,但是在该subscribe
部分中根本没有输出任何消息。此外,我尝试将subscriber.next()
呼叫直接放入this.mqttClient.on('message')
回调,也没有成功。
目标是subscriber.next()
触发每条传入消息,以将新数据传递到仪表板。我如何需要修改我的query
方法实现来实现这一点?
我改编的代码
query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = request.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
const clientOnObs = bindCallback(this.mqttClient.on).bind(this.mqttClient);
this.mqttClient.on('message', (topic: string, message: any) => {
// this is printed correctly with every message
console.log(topic, JSON.parse(message.toString()));
});
return () => {
clientOnObs('message').subscribe((payload: any[]) => {
// const topic: string = payload[0];
const message = JSON.parse(payload[1].toString());
// This is only printed on the first incoming message
console.log(message);
frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
// The frame is not passed on at all
subscriber.next({
data: [frame],
key: query.refId,
});
});
};
});
});
return merge(...streams);
}