0

我一次从我的频道中的 IoT 设备接收多个数据记录作为 JSON 数组。收到的消息如下所示:

[
    {
      "Field1": "Value1",
      "Field2": "Value2",
      "Field3": "Value3"
    },
    {
      "Field1": "AnotherValue1",
      "Field2": "AnotherValue2",
      "Field3": "AnotherValue3"
    }
]

我使用以下 SQL 查询创建了一个数据集:

SELECT * FROM mydatastore

当我运行数据集时,返回的结果是:

array                                              __dt 
-----                                              -----
[{field1=Value1, field2=Value2, field3=Value3}]    2019-02-21 00:00:00.000

我想要的结果是:

Field1           Field2           Field3
------           ------           ------
Value1           Value2           Value3
AnotherValue1    AnotherValue2    AnotherValue3

如何让 IoT Analytics 在数据存储中为接收到的 JSON 数组中的每个元素创建一个新行?

4

2 回答 2

0

如何让 IoT Analytics 在数据存储中为接收到的 JSON 数组中的每个元素创建一个新行?

最简单的方法应该是在您的管道上利用Lambda 活动,并让它将单个 JSON 有效负载解析为所需的结构。这在某种程度上取决于发送到 Channel 的消息的“原始”结构。

因此,例如,我们可以通过CLI batch-put-message向 Channel 发送数据,如下所示:

aws iotanalytics batch-put-message --channel-name sample_channel --messages '[{"messageId": "message1", "payload": "{\"array\": [{\"Field1\": \"Value1\", \"Field2\": \"Value2\", \"Field3\": \"Value3\"},{\"Field1\": \"AnotherValue1\", \"Field2\": \"AnotherValue2\", \"Field3\": \"AnotherValue3\"}]}"}]'

然后,Channel 将具有如下结构的单个消息:

{
  "messageId": "message1",
  "payload": {
    "array": [
      {
        "Field1": "Value1",
        "Field2": "Value2",
        "Field3": "Value3"
      },
      {
        "Field1": "AnotherValue1",
        "Field2": "AnotherValue2",
        "Field3": "AnotherValue3"
      }
    ]
  }
}

如果您的管道有 Lambda 活动,则来自通道的消息将在参数中传递给您的 Lambda 函数event

我使用 AWS Lambda 控制台内联编辑器创建了一个简单的 Lambda 函数(使用 Python 3.7),并将其命名为sample_lambda

import json
import sys
import logging

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)


def lambda_handler(event, context):
    # This can be handy to see the raw structure of the incoming event
    # will log to the matching CloudWatch log:
    # /aws/lambda/<name_of_the_lambda>
    # logger.info("raw event: {}".format(event))

    parsed_rows = []

    # Depending on the batchSize setting of the Lambda Pipeline Activity,
    # you may receive multiple messages in a single event
    for message_payload in event:
        if 'array' in message_payload:
            for row in message_payload['array']:
                parsed = {}
                for key, value in row.items():
                    parsed[key] = value
                parsed_rows.append(parsed)

    return parsed_rows

我添加了适当的权限,以便 IoT-Analytics 可以通过 CLI 调用 lambda 函数:

aws lambda add-permission --function-name sample_lambda --statement-id statm01 --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction

重新处理Pipeline,解析后的行放入DataStore;执行数据集,我得到这个净结果:

"array","field1","field2","field3","__dt"
,"Value1","Value2","Value3","2019-04-26 00:00:00.000"
,"AnotherValue1","AnotherValue2","AnotherValue3","2019-04-26 00:00:00.000"
于 2019-04-26T17:22:07.670 回答
0

迟回复:

想想看,物联网规则选择语句有很多内置的子句/函数,比如能够转换输入有效负载的“as”。请检查,可能会解决。

示例 - https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-select.html

在主题“主题/子主题”上发布的传入有效负载:{“颜色”:{“红色”:255,“绿色”:0,“蓝色”:0},“温度”:50}

SQL:

SELECT color.red as red_value FROM 'topic/subtopic' Outgoing payload: {"red_value":255}

于 2020-10-17T15:18:22.803 回答