0

在我的代码中,我订阅了 3 个不同的主题:

/devices/{}/config
/devices/{}/events
/devices/{}/state

如果我没有订阅 /devices/{}/config 我不会收到任何配置消息,那没关系。但是,如果我在 /devices/{}/config 中订阅了我的每个订阅,我都会收到一条消息。

例子:

在“/devices/{}/config”和“/devices/{}/events”中订阅了 2 条配置消息。

在“/devices/{}/config”和“/devices/{}/state”中订阅了 2 条配置消息。

在“/devices/{}/config”、“/devices/{}/state”和“/devices/{}/events”中订阅了 3 条配置消息。

在“/devices/{}/events”和“/devices/{}/state”中订阅,我收到 0 条配置消息。

这导致 IoT 核心出现错误:mqtt:

无法更新设备“xxxxxxxx”。设备状态每 1s 只能更新一次。

事实上,我想要并且只需要一个配置消息。我究竟做错了什么?

这是我的代码:

# [START iot_mqtt_includes]
import argparse
import datetime
import os
import random
import ssl
import time
import log
import updateConfig
import jwt
import paho.mqtt.client as mqtt
import payload
# [END iot_mqtt_includes]

# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1

# The maximum backoff time before giving up, in seconds.
MAXIMUM_BACKOFF_TIME = 32

# Whether to wait with exponential backoff before publishing.
should_backoff = False


# [START iot_mqtt_jwt]

def create_jwt(project_id, private_key_file, algorithm):

    token = {
            # The time that the token was issued at
            'iat': datetime.datetime.utcnow(),
            # The time the token expires.
            'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=60),
            # The audience field should always be set to the GCP project id.
            'aud': project_id
    }

    # Read the private key file.
    with open(private_key_file, 'r') as f:
        private_key = f.read()
        f.close()

    return jwt.encode(token, private_key, algorithm=algorithm)
# [END iot_mqtt_jwt]


# [START iot_mqtt_config]
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return '{}: {}'.format(rc, mqtt.error_string(rc))


def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    log.append_log('ao_conectar - ' +  mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1


def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    append_log('ao_desconectar - '+ error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True


def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    config = str(message.payload)
    retorno = updateConfig.update(config)

def get_client(
        project_id, cloud_region, registry_id, device_id, private_key_file,
        algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client = mqtt.Client(
            client_id=('projects/{}/locations/{}/registries/{}/devices/{}'
                       .format(
                               project_id,
                               cloud_region,
                               registry_id,
                               device_id)))

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
            username='unused',
            password=create_jwt(
                    project_id, private_key_file, algorithm))

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = '/devices/{}/config'.format(device_id)

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    return client
# [END iot_mqtt_config]


def parse_command_line_args():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description=(
            'Example Google Cloud IoT Core MQTT device connection code.'))
    parser.add_argument(
            '--project_id',
            default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
            help='GCP cloud project name')
    parser.add_argument(
            '--registry_id', required=True, help='Cloud IoT Core registry id')
    parser.add_argument(
            '--device_id', required=True, help='Cloud IoT Core device id')
    parser.add_argument(
            '--private_key_file',
            required=True, help='Path to private key file.')
    parser.add_argument(
            '--algorithm',
            choices=('RS256', 'ES256'),
            required=True,
            help='Which encryption algorithm to use to generate the JWT.')
    parser.add_argument(
            '--cloud_region', default='us-central1', help='GCP cloud region')
    parser.add_argument(
            '--ca_certs',
            default='roots.pem',
            help=('CA root from https://pki.google.com/roots.pem'))
    parser.add_argument(
            '--message_type',
            choices=('event', 'state'),
            default='event',
            help=('Indicates whether the message to be published is a '
                  'telemetry event or a device state message.'))
    parser.add_argument(
            '--mqtt_bridge_hostname',
            default='mqtt.googleapis.com',
            help='MQTT bridge hostname.')
    parser.add_argument(
            '--mqtt_bridge_port',
            choices=(8883, 443),
            default=8883,
            type=int,
            help='MQTT bridge port.')
    parser.add_argument(
            '--jwt_expires_minutes',
            default=20,
            type=int,
            help=('Expiration time, in minutes, for JWT tokens.'))

    return parser.parse_args()


# [START iot_mqtt_run]
def main():
    log.append_log("Iniciando uma nova conexao com o Google IoT.")
    global minimum_backoff_time

    args = parse_command_line_args()

    # Publish to the events or state topic based on the flag.

    jwt_iat = datetime.datetime.utcnow()
    jwt_exp_mins = args.jwt_expires_minutes
    client = get_client(
        args.project_id, args.cloud_region, args.registry_id, args.device_id,
        args.private_key_file, args.algorithm, args.ca_certs,
        args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    # Publish num_messages mesages to the MQTT bridge once per second.
    while True:
        # Process network events.
        client.loop()
        # Wait if backoff is required.
        if should_backoff:
            # If backoff time is too large, give up.
            if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
                log.append_log('Tempo maximo de backoff excedido. Desistindo.')
                break

            # Otherwise, wait and connect again.
            delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
            log.append_log('Esperando {} segundos antes de reconectar.'.format(delay))
            time.sleep(delay)
            minimum_backoff_time *= 2
            client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

        # [START iot_mqtt_jwt_refresh]
        seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
        if seconds_since_issue > 60 * jwt_exp_mins:
            log.append_log('Atualizando token de acesso depois de {} segundos'.format(seconds_since_issue))
            client.loop_stop()
            jwt_iat = datetime.datetime.utcnow()
            client = get_client(
                args.project_id, args.cloud_region,
                args.registry_id, args.device_id, args.private_key_file,
                args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
                args.mqtt_bridge_port)
        # [END iot_mqtt_jwt_refresh]
        # Publish "payload" to the MQTT topic. qos=1 means at least once
        # delivery. Cloud IoT Core also supports qos=0 for at most once
        # delivery.

        payloadToPublish = payload.lerPayload()

        if payloadToPublish != 'sem payload':
            if payloadToPublish[0] == 'event':
                mqtt_topic = '/devices/{}/{}'.format(args.device_id, 'events')
                log.append_log('publicando [' + payloadToPublish[1] + ']')
            else:
                mqtt_topic = '/devices/{}/{}'.format(args.device_id, 'state')

            client.publish(mqtt_topic, payloadToPublish[1], qos=1)
# [END iot_mqtt_run]


if __name__ == '__main__':
    main()
4

1 回答 1

0

这是预期的行为:

  • 事件和状态主题用于设备到云的通信,配置主题用于将配置数据发送到物联网设备。订阅事件/状态主题实际上是一个 NOOP。
  • Cloud IoT Core 仅支持 QoS 1,即“至少一次”用于消息传输,您可能想尝试 QoS 0,直到 ACK 才重试消息传输,但我认为这不是您想要的
  • STATE 和 CONFIG 传输仅限于每秒一条消息;比这更快地传输配置更改将导致您看到的错误
于 2018-08-14T21:51:59.403 回答