3

按照此处的 AWS 示例并引用balena.io 示例,我正在尝试获取“事物”(当前是我的 Mac 上的脚本)来更新 AWS 上的事物影子。

我越来越近了。到目前为止,我可以成功注册对事物影子的兴趣(更新:并订阅和发布到 MQTT 主题,接收更新)。但是,尝试更新阴影时出现超时。最初,由于缺少关于事物证书的策略,我在注册兴趣时遇到了超时,现在是基本的。我目前的想法是,也许我需要使用不同的根 CA 证书(当前使用提供的 CA1),或者我的 base64 编码证书字符串可能有问题,编码为:

openssl base64 -in some-cert.pem -out some-cert.txt
#gets copied to clipboard and pasted in UI env field
pbcopy < some-cert.txt

这是我到目前为止所拥有的。请注意 TODO 说明,因为这是一项正在进行的工作。

政策(目前过于宽松):

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:*",
      "Resource": "*"
    }
  ]
}

故障排除.ts

// application endpoint
const AWS_ENDPOINT = "my-endpoint.iot.us-east-1.amazonaws.com";
//  thing private key encoded in base64
const AWS_PRIVATE_CERT = "gets set in balena.io UI as base64 string";
// aws root CA1 certificate encoded in base64
const AWS_ROOT_CERT = "gets set in balena.io UI as base64 string";
// thing certificate
const AWS_THING_CERT = "gets set in balena.io UI as base64 string";
const AWS_REGION = 'us-east-1';

import { DataUploader } from './data-uploader';
import { DataUploaderOptions } from './model/data-uploader-options';

const dataUploaderOptions: DataUploaderOptions = {
  awsEndpoint: AWS_ENDPOINT,
  awsPrivateCert: AWS_PRIVATE_CERT,
  awsCaCert: AWS_ROOT_CERT,
  awsThingCert: AWS_THING_CERT,
  awsRegion: AWS_REGION
}

const dataUploader = new DataUploader(dataUploaderOptions);

数据上传器.ts

/**
 * Upload image and meta data to AWS as an AWS IoT thing.
 */

import { thingShadow, ThingShadowOptions } from 'aws-iot-device-sdk';
import { DataUploaderOptions } from './model/data-uploader-options';
import { get } from 'lodash';

export class DataUploader {

  // Provided options via constructor parameter
  protected readonly options: DataUploaderOptions;

  // AWS IoT thing shadows
  protected thingShadows: thingShadow;
  protected thingName = 'TwinTigerSecurityCamera';
  protected currentTimeout: NodeJS.Timer;
  // TODO: Typescript wants MqttClient type, better than 'any', check if it makes sense to import here just for type reference.
  protected clientToken: string | void | any;

  // TEMP: Move to class options:
  protected operationTimeout = 10000;
  protected operationTimeoutRetry = 1000;

  constructor(options: DataUploaderOptions) {
    // Set options based on input or defaults if not provided.
    this.options = {
      awsEndpoint: get(options, 'awsEndpoint', ''), // empty will throw an error
      awsPrivateCert: get(options, 'awsPrivateCert', ''), // empty will throw an error
      awsCaCert: get(options, 'awsCaCert', ''), // empty will throw an error
      awsThingCert: get(options, 'awsThingCert', ''), // empty will throw an error
      awsRegion: get(options, 'awsRegion', ''), // empty will throw an error
      awsMqttClientId: get(options, 'awsMqttClientId', this.createUniqueClientId())
    };

    // Proceed no further if AWS options are not set properly.
    if (!this.options.awsEndpoint ||
        !this.options.awsPrivateCert ||
        !this.options.awsCaCert ||
        !this.options.awsThingCert ||
        !this.options.awsRegion) {
      throw new Error('DataUploader constructor: AWS IoT options are required.');
    }

    // setup thingShadow and events
    this.initThingShadow();
  }

  /**
   * 
   */
  protected initThingShadow = () => {
    // create thing shadow: extends IoT 'device' with extra status that can be
    // set/received online or offline (can sync when online)
    const thingShadowOptions: ThingShadowOptions = {
      // TODO: revise 'ca' to 'rootCa1' to help clarify
      clientCert: Buffer.from(this.options.awsThingCert, 'base64'),
      caCert: Buffer.from(this.options.awsCaCert, 'base64'),
      privateKey: Buffer.from(this.options.awsPrivateCert, 'base64'),
      clientId: this.options.awsMqttClientId,
      host: this.options.awsEndpoint,
      region: this.options.awsRegion
    };
    this.thingShadows = new thingShadow(thingShadowOptions);

    this.thingShadows.on('connect', () => {
      console.log('connected');
      this.registerThingShadowInterest();
    });

    // Report the status of update(), get(), and delete() calls.
    this.thingShadows.on('status', (thingName, stat, clientToken, stateObject) => {
      const tmpObj = JSON.stringify(stateObject);
      console.log(`received ${stat} on ${thingName}: ${tmpObj}`);
    });

    this.thingShadows.on('message', (topic, message) => {
      const tmpObj = JSON.stringify(message); 
      console.log(`message received for ${topic}: ${tmpObj}`);
    });

    this.thingShadows.on('foreignStateChange', (thingName, operation, stateObject) => {
      const tmpObj = JSON.stringify(stateObject); 
      console.log(`foreignStateChange happened for ${thingName}, ${operation}: ${tmpObj}`);
    });

    this.thingShadows.on('delta', (thingName, stateObject) => {
      const tmpObj = JSON.stringify(stateObject); 
      console.log(`received delta on ${thingName}: ${tmpObj}`);
    });

    this.thingShadows.on('timeout', (thingName, clientToken) => {
      console.log(`timeout for ${thingName}: ${clientToken}`);
    });
  }

  /**
   * 
   */
  protected setInitialThingShadowState = (): void => {
    // TODO: consider making interface for this
    const cameraState = JSON.stringify({
      state: {
        desired: {
          signedUrlRequests: 10,
          signedUrls: [],
          startedAt: new Date(),
          uploadCount: 0,
          uploadSpeed: 0
        }
      }
    });

    this.thingShadowOperation('update', cameraState);
  }

  /**
   * 
   */
  protected registerThingShadowInterest = () => {
    this.thingShadows.register(this.thingName, { 
      ignoreDeltas: true
    },
    (err, failedTopics) => {
      if (!err && !failedTopics) {
        console.log(`${this.thingName} interest is registered.`);
        this.setInitialThingShadowState();
      } else {
        // TODO: What do we do now? Throw an error?
        const failedString = JSON.stringify(failedTopics);
        console.error(`registerThingShadowInterest error occurred: ${err.message}, failed topics: ${failedString}`);
      }
    });
  }

  /**
   * Thanks: https://github.com/aws/aws-iot-device-sdk-js/blob/master/examples/thing-example.js
   */
  protected thingShadowOperation = (operation: string, state: Object) => {
    // TODO: Check if there's a better way to do this. We want to accept operation
    // parameter as string only (no any), then ensure it's a key of the thingShadow
    // class. It works in TypeScipt, however calling class methods dynamically seems
    // like one of the few cases when the developer wants to ditch TypeScript.
    const operationKey: ('register' | 'unregister' | 'update' | 'get' | 'delete' | 'publish' | 'subscribe' | 'unsubscribe') = <any>operation;
    const clientToken = this.thingShadows[operationKey](this.thingName, state);

    if (clientToken === null) {
       // The thing shadow operation can't be performed because another one
       // is pending. If no other operation is pending, reschedule it after an 
       // interval which is greater than the thing shadow operation timeout.
       if (this.currentTimeout !== null) {
          console.log('Operation in progress, scheduling retry...');
          this.currentTimeout = setTimeout(
             function() {
                this.thingShadowOperation(operation, state);
             },
             this.operationTimeout + this.operationTimeoutRetry);
       }
    } else {
       // Save the client token so that we know when the operation completes.
       this.clientToken = clientToken;
    }
 }

  /**
   * Generate a unique MQTT client id so not to collide with other ids in use.
   */
  // TODO
  createUniqueClientId = (): string => {
    return 'temporaryClientIdWillBeMoreUniqueInTheFuture';
  }
}

模型/数据上传器-options.ts

// DataUploader options
export interface DataUploaderOptions {
  // AWS IoT endpoint found in IoT settings in management console
  awsEndpoint: string;

  // AWS IoT private certificate for single device
  awsPrivateCert: string;

  // AWS IoT CA certificate
  awsCaCert: string;

  // AWS IoT thing certificate for single device
  awsThingCert: string;

  // AWS IoT region where thing settings live
  awsRegion: string;

  // an MQTT client id that needs to be unique amongst all other client ids
  awsMqttClientId?: string;
}

我缺少什么来更新影子?


更新:

当我直接使用证书而不是 base64 字符串版本时,我得到相同的超时结果。例子:

    const thingShadowOptions = {
      keyPath: '/Users/me/Downloads/private-key-aaaaaaaaaa-private.pem.key',
      certPath: '/Users/me/Downloads/thing-aaaaaa-certificate.pem.crt',
      caPath: '/Users/me/Downloads/ca-AmazonRootCA1.pem',
      clientId: this.options.awsMqttClientId,
      host: this.options.awsEndpoint,
      region: this.options.awsRegion
    }

另外,我可以订阅一个 MQTT 主题并发布到它:

    this.thingShadows.on('connect', () => {
      console.log('connected');
      // this.registerThingShadowInterest();

      // TEMP
      this.thingShadows.subscribe('topic_1');
      this.thingShadows.publish('topic_1', JSON.stringify({ test_data: 1}));
    });

输出:

message received for topic_1: {"type":"Buffer","data":[123,34,116,101,115,116,95,100,97,116,97,34,58,49,125]}
4

1 回答 1

5

调试影子更新

您可以订阅保留的主题$aws/things/+/shadow/#来调试问题。

这将显示 400 错误和一条消息。

在此处输入图像描述

修复更新负载

错误信息是:

“消息”:“缺少所需节点:状态”

这在字符串化更新有效负载中可见。但是stateObject传递给的参数thingShadow.update()应该是一个对象而不是字符串。

所以从以下位置删除JSON.stringify

const cameraState = JSON.stringify({
  state: {
    desired: {
      signedUrlRequests: 10,
      signedUrls: [],
      startedAt: new Date(),
      uploadCount: 0,
      uploadSpeed: 0
    }
  }
});

并将其更改为一个对象:

const cameraState = {
  state: {
    desired: {
      signedUrlRequests: 10,
      signedUrls: [],
      startedAt: new Date(),
      uploadCount: 0,
      uploadSpeed: 0
    }
  }
};

请参阅https://github.com/aws/aws-iot-device-sdk-js#update上的文档

awsIot.thingShadow#update(thingName, stateObject)

使用 JavaScript 对象 stateObject 中指定的状态更新名为 thingName 的事物影子。

查看结果

修复参数格式后,更新在事物的阴影中可见:

在此处输入图像描述

于 2020-04-30T13:06:38.240 回答