0

我正在尝试编写一个 python 程序来更新给定 IoT 集线器的所有设备双胞胎,仅通过拥有集线器的连接字符串。此处给出的代码:Get started with device twins (Python)不是最新的并且会崩溃。有没有人有这种工作的成功经验?

4

2 回答 2

1

除了@silent评论之外,Azure IoT Hub 还内置了一个用于批量导入和导出 IoT Hub 设备标识的特殊作业。

此功能允许更新包含报告属性的设备孪生。批量作业由 REST API Create Import Export Job提交给服务。

批量作业在每个设备的 blob 文件中逐行描述,以下行是device1updateTwin的示例:

{ "id":"device1", "importMode":"updateTwin", "status":"enabled", "tags":{},"properties":{"desired":{ "key1":12},"reported":{ "key2":null, "key3":"abcd"} }}

以下代码片段显示了如何调用此 REST 服务来更新inputBlobName中描述的所有设备孪生:

import requests
import json
import time
import urllib
import hmac
import hashlib
import base64


# Example of the blob content for one device
# { "id":"device1", "importMode":"updateTwin", "status":"enabled", "tags":{},"properties":{"desired":{ "key1":12},"reported":{ "key2":null, "key3":"abcd"  } } }


# IoT Hub and Blob Storage
iothub_connection_string = "<IoTHubConnectionString>"
blobContainerUri = "<blobContainerUriSaS>"
inputBlobName = "<inputBlobName>"

def get_sas_token(resource_uri, sas_name, sas_value):  
    sas = base64.b64decode(sas_value.encode('utf-8'))
    expiry = str(int(time.time() + 10000))
    string_to_sign = (resource_uri + '\n' + expiry).encode('utf-8')
    signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
    signature = urllib.parse.quote(base64.b64encode(signed_hmac_sha256.digest()))
    return  "SharedAccessSignature sr={}&sig={}&se={}&skn={}".format(resource_uri, signature, expiry, sas_name)

# iothub_sas_token from connection string
cs = dict(map(lambda x: x.split('=',1), iothub_connection_string.split(';')))
iothub_namespace = cs["HostName"].split('.')[0]
sas_token = get_sas_token(cs["HostName"], cs["SharedAccessKeyName"], cs["SharedAccessKey"])

# REST API see doc: https://docs.microsoft.com/en-us/rest/api/iothub/digitaltwinmodel/service/createimportexportjob
uri = "https://{}.azure-devices.net/jobs/create?api-version=2018-06-30".format(iothub_namespace)
headers = { 'Authorization':sas_token, 'Content-Type':'application/json;charset=utf-8' }
payload = { "inputBlobContainerUri": blobContainerUri, "outputBlobContainerUri": blobContainerUri, "inputBlobName": inputBlobName, "type":"import" }
res = requests.post(uri, data = json.dumps(payload), headers = headers)

print(res)

# check if the job has been accepted
if res.status_code != 200: 
    quit()

# check the job status progress 
jobId = json.loads(res.content)["jobId"]
uri = "https://{}.azure-devices.net/jobs/{}?api-version=2018-06-30".format(iothub_namespace, jobId);
while(True): 
    time.sleep(2)    
    res = requests.get(uri, data = None, headers = headers)
    if  res.status_code == 200 and json.loads(res.content)["status"] == "running": 
        print(".", end="", flush=True)
    else: 
        break
print("Done")

要创建 blob 容器、生成其 sas uri 地址等,可以使用Microsoft Azure Storage Explorer

于 2019-11-01T05:09:13.627 回答
0

在@Roman Kiss 的帮助下,我使用了创建作业API 请求——它不需要使用存储。

JOB_ID = {INSERT_JOB_ID}
CONNECTION_STRING = {INSERT_CONNECTION_STRING}
IOT_HUB_NAME = {INSERT_IOT_HUB_NAME}

def create_device_twin():
    return {"jobId": JOB_ID, "type": "scheduleUpdateTwin", "updateTwin": {"properties": {
        "desired": {INSERT_YOUR_DESIRED_PROERTIES}
    }, "etag": "*"}
     }

def get_sas_token():
    cs = dict(map(lambda x: x.split('=', 1), CONNECTION_STRING.split(';')))
    resource_uri = cs["HostName"]
    sas_name = cs["SharedAccessKeyName"]
    sas_value = cs["SharedAccessKey"]
    sas = base64.b64decode(sas_value.encode('utf-8'))
    expiry = str(int(time.time() + 10000))
    string_to_sign = (resource_uri + '\n' + expiry).encode('utf-8')
    signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
    signature = urllib.parse.quote(base64.b64encode(signed_hmac_sha256.digest()))
    return "SharedAccessSignature sr={}&sig={}&se={}&skn={}".format(resource_uri, signature, expiry, sas_name)

if __name__ == '__main__':
    uri = "https://{}.azure-devices.net/jobs/v2/{}?api-version=2018-06-30".format(IOT_HUB_NAME, JOB_ID)
    sas_token = get_sas_token()
    headers = {'Authorization': sas_token, 'Content-Type': 'application/json;charset=utf-8'}
    res = requests.put(uri, headers=headers, data=json.dumps(create_device_twin()))
于 2019-11-04T08:05:52.720 回答