-1

我目前正在尝试为 Grafana 创建一个流式数据源插件。为此,我改编了官方说明中的代码,以便将来自 MQTT 主题的数据传递给 Grafana。这是带有定时输出的官方工作演示代码frame

Grafana 指南中的代码

query(options: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
  const streams = options.targets.map(target => {
    const query = defaults(target, defaultQuery);

    return new Observable<DataQueryResponse>(subscriber => {
      const frame = new CircularDataFrame({
        append: 'tail',
        capacity: 1000,
      });

      frame.refId = query.refId;
      frame.addField({ name: 'time', type: FieldType.time });
      frame.addField({ name: 'value', type: FieldType.number });

      const intervalId = setInterval(() => {
        frame.add({ time: Date.now(), value: Math.random() });

        subscriber.next({
          data: [frame],
          key: query.refId,
        });
      }, 100);

      return () => {
        clearInterval(intervalId);
      };
    });
  });

  return merge(...streams);
}

与我的 Mosquitto 经纪人的连接以及数据的提交和接收工作得很好。有问题的是将接收到的数据通过返回CircularDataFrame.

正如这个答案中所建议的,我使用 rxjsbindCallback函数来观察消息事件处理程序。这样做时,消息回调函数中的控制台输出会为每条传入消息正确打印。而clientOnObs实例订阅中的控制台输出仅在第一条消息上触发,之后不再触发。但是,应该是订阅中的控制台输出也会在每条传入消息上触发,这样我就可以展开CircularDataFrame然后将其通过 传递给 Grafana Dashboard subscriber.next()bindCallback()我已经尝试交换and的顺序this.mqttClient.on(),但是在该subscribe部分中根本没有输出任何消息。此外,我尝试将subscriber.next()呼叫直接放入this.mqttClient.on('message')回调,也没有成功。

目标是subscriber.next()触发每条传入消息,以将新数据传递到仪表板。我如何需要修改我的query方法实现来实现这一点?

我改编的代码

query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
    const streams = request.targets.map(target => {
      const query = defaults(target, defaultQuery);

      return new Observable<DataQueryResponse>(subscriber => {
        const frame = new CircularDataFrame({
          append: 'tail',
          capacity: 1000,
        });

        frame.refId = query.refId;
        frame.addField({ name: 'time', type: FieldType.time });
        frame.addField({ name: 'value', type: FieldType.number });

        const clientOnObs = bindCallback(this.mqttClient.on).bind(this.mqttClient);

        this.mqttClient.on('message', (topic: string, message: any) => {
          // this is printed correctly with every message
          console.log(topic, JSON.parse(message.toString()));
        });

        return () => {
          clientOnObs('message').subscribe((payload: any[]) => {
            // const topic: string = payload[0];
            const message = JSON.parse(payload[1].toString());
            
            // This is only printed on the first incoming message
            console.log(message);

            frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
            
            // The frame is not passed on at all
            subscriber.next({
              data: [frame],
              key: query.refId,
            });
          });
        };
      });
    });

    return merge(...streams);
  }
4

1 回答 1

1

bindCallback是一个用于将基于回调的函数转换为 Obaservable 的函数,它发出一次然后完成。因此,这可能是仅针对第一条消息触发订阅的原因。

如果我查看您的代码,我会尝试执行以下操作

query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
    const streams = request.targets.map(target => {
      const query = defaults(target, defaultQuery);

      return new Observable<DataQueryResponse>(subscriber => {
        const frame = new CircularDataFrame({
          append: 'tail',
          capacity: 1000,
        });

        frame.refId = query.refId;
        frame.addField({ name: 'time', type: FieldType.time });
        frame.addField({ name: 'value', type: FieldType.number });

        this.mqttClient.on('message', (topic: string, message: any) => {
          // call next on the subscriber here
          const message = JSON.parse(payload[1].toString());
          frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
          subscriber.next({
            data: [frame],
            key: query.refId,
          });
        });

        return () => {
          // place here any code that has to run to clean up when the Observable
          // completes. In the graphana official example, for instance, this is 
          // where the interval is cleared
        };
      });
    });

    return merge(...streams);
  } 
于 2021-03-15T15:08:30.153 回答