0

物联网核心非常新。所以我基本上只是重新利用 AWS SDK 示例 pubsub.py 代码(https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/pubsub.py)但是出于某种原因,即使我可以建立与 AWS 的连接,我也无法:

  • 订阅主题

执行时iot_connection.subscribe(),终端会显示“Subscribing to topic test/topic...”并无限期挂起。请注意,如果我不包含subscribe_result = subscribe_future.result()下面的完整代码,则会成功执行,但同样,控制台中不会收到任何消息。附带问题:您实际上必须订阅一个主题才能发布到它吗?

  • 在控制台中查看消息

执行时iot_connection.publish(),一切似乎都已成功运行,但是,AWS 测试控制台中未显示“Hello, World”,即使我订阅了“#”和“test/topic”。

任何帮助将非常感激!

代码:

import argparse
from uuid import uuid4
import json
import time
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder


def make_parser():
    parser = argparse.ArgumentParser(description="Send and receive messages through and MQTT connection.")
    parser.add_argument('endpoint', help="Your AWS IoT custom endpoint, not including a port.")
    parser.add_argument('--port', type=int, help="Specify port. AWS IoT supports 443 and 8883.", metavar='')
    parser.add_argument('--cert', help="File path to your client certificate, in PEM format.", metavar='')
    parser.add_argument('--key', help="File path to your private key, in PEM format.", metavar='')
    parser.add_argument('--root-ca', help="File path to root certificate authority, in PEM format.", metavar='')
    parser.add_argument('--client-id', default="test-" + str(uuid4()), help="Client ID for MQTT connection.",
                        metavar='')
    parser.add_argument('--topic', default="test/topic", help="Topic to subscribe to, and publish messages to.",
                        metavar='')
    parser.add_argument('--message', default="Hello World!", help="Message to publish. ", metavar='')
    parser.add_argument('--count', default=10, type=int, help="Number of messages to publish.", metavar='')
    return parser


class IoT:

    def __init__(self, args):
        self.endpoint = args.endpoint
        self.port = args.port
        self.cert = args.cert
        self.key = args.key
        self.root_ca = args.root_ca
        self.client_id = args.client_id
        self.topic = args.topic
        self.message = args.message
        self.count = args.count
        print("Initializing parameters...")

    def __enter__(self):
        print("Spinning up resources...")
        self.event_loop_group = io.EventLoopGroup(1)
        self.host_resolver = io.DefaultHostResolver(self.event_loop_group)
        self.client_bootstrap = io.ClientBootstrap(self.event_loop_group, self.host_resolver)
        print("Establishing connection to AWS...")
        self.mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint=self.endpoint,
            port=self.port,
            cert_filepath=self.cert,
            pri_key_filepath=self.key,
            ca_filepath=self.root_ca,
            client_id=self.client_id,
            client_bootstrap=self.client_bootstrap,
            clean_session=False,
            keep_alive_secs=30
        )
        connect_future = self.mqtt_connection.connect()
        connect_future.result()
        print("Connected!")

    def __exit__(self, exception_type, exception_value, traceback):
        print("Disconnecting...")
        disconnect_future = self.mqtt_connection.disconnect()
        disconnect_future.result()
        print("Disconnected!")

    def subscribe(self):
        print("Subscribing to topic {}...".format(self.topic))
        subscribe_future, packet_id = self.mqtt_connection.subscribe(
            topic=self.topic,
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_message_recieved
        )
        subscribe_result = subscribe_future.result()
        print("Result: {}".format(str(subscribe_result['qos'])))
        print("Subscribed!")

    def on_message_recieved(self, topic, paylod, dup, qos, retain, **kwargs):
        print("Recieved message")

    def publish(self):
        if self.message:
            print("Publishing message to topic '{}': {}".format(self.topic, self.message))
            message = "{} [{}]".format(self.message, self.count)
            message_json = json.dumps(message)
            self.mqtt_connection.publish(
                topic=self.topic,
                payload=message_json,
                qos=mqtt.QoS.AT_LEAST_ONCE
            )
            time.sleep(1)  # thought this might fix things, it did not


if __name__ == '__main__':
    print("Gathering user inputs...")
    parser = make_parser()
    args = parser.parse_args()
    # Starting IoT Core connection
    iot_connection = IoT(args)
    with iot_connection:
        iot_connection.subscribe()  # subscribing to topic
        iot_connection.publish()  # publishing message hopefully
4

1 回答 1

0

期间解决了。这一定是我的资源 ARN 的问题 - 当我创建新策略并将资源 ARN 设置为“*”时,一切都按预期工作。以前我曾尝试将 ARN 限制为特定的客户端 ID。不知道为什么这不起作用,因为据我所知,我正确处理了客户端 ID。

于 2021-08-13T19:36:18.240 回答