0

我正在关注本教程,并且我的代码已经将消息发布到/devices/sm1/events主题,其中sm1是我的设备 ID。

我想知道如何订阅这个主题,因为教程说要使用/devices/sm1/config,但我收到的是空消息。我已经尝试使用发布中使用的相同“路径”(/devices/sm1/events),但它也不起作用。

奇怪的是,我给主题的名称是sm1,而与我的设备关联的主题在 GoogleIoT 控制台上显示为projects/myprojectname/topics/sm1. 因此,除了发现如何订阅提到的主题之外,我还感谢任何与在 GoogleIoT 中使用 pub/sub 主题的正确方法相关的解释(文档不是很清楚)。

这是我的subscribe.py

mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
topic = "/devices/sm1/config"

def on_connect(client, userdata, flags, response_code):
    print("Connected with status: {0}".format(response_code))
    client.subscribe(topic, 1)

def on_message(client, userdata, msg):
    print("Topic: {0}  --  Payload: {1}".format(msg.topic, msg.payload))

if __name__ == "__main__":    
    client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
                         project_id,
                         cloud_region,
                         registry_id,
                         device_id))

    client.username_pw_set(username='unused',
                           password=jwt_maker.create_jwt(project_id,
                                               private_key,
                                               algorithm="RS256"))

    client.tls_set(root_ca,
                   certfile = public_crt,
                   keyfile = private_key,
                   cert_reqs = ssl.CERT_REQUIRED,
                   tls_version = ssl.PROTOCOL_TLSv1_2,
                   ciphers = None)


    client.on_connect = on_connect
    client.on_message = on_message

    print "Connecting to Google IoT Broker..."
    client.connect(mqtt_url, mqtt_port, keepalive=60)
    client.loop_forever()

我的输出:

连接状态:0
主题:/devices/sm1/config -- 有效负载:
主题:/devices/sm1/config -- 有效负载:

4

3 回答 3

2

在阅读了@GabeWeiss 回答中评论部分的讨论之后,我认为您想要实现的目标以及您尝试使用 Pub/Sub 的方式(或目的)有点混乱。

鉴于我认为这里的问题更具概念性,让我首先向您介绍有关Cloud IoT Core 关键概念的通用文档页面,您实际上可以在其中找到一些有关 Cloud IoT Core 和 Pub/Sub 之间关系的信息。综上所述,设备遥测数据发布到 Cloud IoT Core 主题中,稍后通过 Data Broker 发布到 Cloud Pub/Sub 主题中,您可以在外部将其用于其他目的:触发 Cloud Functions,分析数据流数据流等

现在,如果您遵循Cloud IoT Core 的快速入门指南,您将看到如何在给定点创建一个设备注册表,该注册表绑定到发布设备遥测事件的 Pub/Sub 主题。如果您希望写入多个主题,而不是写入默认的 Pub/Sub 主题,则可以按照文档其他部分下的说明进行操作。

最后,谈到订阅主题的问题(Cloud IoT Core 主题,而不是 Pub/Sub 主题,因为只有前者与设备相关),您可以使用以下命令订阅 MQTT 主题,其中(例如),设备正在订阅配置主题,其中发布了配置更新:

# This is the topic that the device will receive configuration updates on.
mqtt_config_topic = '/devices/{}/config'.format(device_id)

# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)

然后,您可以使用on_message()功能处理在您实际订阅的主题上发布的消息。请注意,必须将有效负载解析为字符串 ( str(message.payload))。

def on_message(unused_client, unused_userdata, message):
    payload = str(message.payload)
    print('Received message \'{}\' on topic \'{}\' with Qos {}'.format(
            payload, message.topic, str(message.qos)))

然后,在您的问题中,您说您首先订阅了/devices/{device-id}/config,但这可能不是您想要的,因为这是发布配置更新的主题(即不是发布遥测事件的地方)。然后,我知道您应该订阅/devices/{device-id}/events哪个是发布遥测指标的实际 MQTT 主题。如果这不起作用,则可能存在另一个相关问题,因此请确保您message正确解析变量,并可能尝试使用 Pub/Sub 和使用注册表创建的默认主题,以检查遥测指标是否正确发表与否。

于 2018-04-11T11:48:16.323 回答
1

编辑:根据下面的评论澄清......

这里有两个 GCP 组件在起作用。设备使用 MQTT 主题(即 /events 主题)与 IoT Core 对话。然后projects/myprojectname/topics/sm1是 IoT Core 中没有的,它在 Pub/Sub 中。当您向 /events MQTT 主题发送消息时,IoT Core 将发送到 /events MQTT 主题的设备中的有效负载代理到创建并附加到注册设备的 IoT Core 注册表的 Pub/Sub 主题.

要查看这些消息,您必须在主题的 Pub/Sub 中创建订阅projects/myprojectname/topics/sm1。如果你去控制台,然后 Pub/Sub->topics。单击主题旁边的三个点,然后选择“新订阅”。订阅后,您可以从设备发送一些数据,然后在命令行上运行(假设您已安装 gcloud 工具):

gcloud beta pubsub subscriptions pull --max-messages=3 <subscription_id>

要对消息执行任何操作,您可以编写订阅 Pub/Sub 主题的脚本(查看 Pub/Sub API)以触发添加到主题的消息。

原始消息

您是否正在向设备发送配置消息?混淆可能是 MQTT 主题是单向的。

所以:1) /events 主题是针对设备->IoT 核心的。2) /config 主题适用于 IoT Core Admin SDK->device

在某个地方的另一个脚本中,或者从 IoT Core UI 界面中,您需要发送一条配置消息才能正确看到 on_message 触发。

在 IoT Core UI(console.cloud.google.com)中,您可以深入到您已注册的单个设备,然后在屏幕顶部单击“更新配置”。将出现一个弹出窗口,可让您向该设备发送文本或 base64 编码消息,它将出现在 /config 主题中。

于 2018-04-04T16:19:33.823 回答
1

我必须向 Google Pub/Sub 图书馆投降才能接收与我指定主题相关的通知。

我的发布代码(仅重要部分):

mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
mqtt_topic = "/devices/sm1/events"

client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
                     project_id,
                     cloud_region,
                     registry_id,
                     device_id), protocol=mqtt.MQTTv311)

client.username_pw_set(username='unused',
                       password=jwt_maker.create_jwt(project_id,
                                           private_key,
                                           algorithm="RS256"))

client.tls_set(root_ca, 
               certfile = public_crt, 
               keyfile = private_key, 
               cert_reqs = ssl.CERT_REQUIRED, 
               tls_version = ssl.PROTOCOL_TLSv1_2, 
               ciphers = None)

client.connect(mqtt_url, mqtt_port, keepalive=60)
res = client.publish(mqtt_topic, some_data, qos=1)

在 Google Cloud Platform 门户中,我必须在 Pub/Sub 部分创建订阅,将其分配给我创建的主题,这已经是我的默认主题(可能链接到 /events 主题)。创建的订阅格式如下:

projects/project_name/subscriptions/subscription_name

我的订阅代码,使用 Google Pub/Sub 库,因为无法使用 MQTT 协议:

from google.cloud import pubsub

def callback(message):
    print(message.data)
    message.ack()

project_id = "project_name"
subscription_name = "sm1"

subscriber = pubsub.SubscriberClient()
subscription_name = 'projects/{}/subscriptions/{}'.format(project_id, subscription_name)

subscription = subscriber.subscribe(subscription_name)
future = subscription.open(callback)

try:
    future.result()
except Exception as ex:
    subscription.close()
    raise

我希望这可以帮助任何人。更多细节可以在这里找到(github)

于 2018-05-18T17:50:07.373 回答