0

我正在尝试将从 IoT 设备发送的消息存储在 BigQuery 表中。

云架构如下:

本地设备 -> json_message -> mqtt_client -> GC IoT 设备 -> 设备注册表 -> Pub/Sub 主题 -> 带有 Pub/Sub 主题到 BigQuery 模板的数据流 -> BigQuery 表

我已经让这个系统使用这样构造的非嵌套 JSON 消息

json_dict = {"instrument": instrument,
 "spectrum": str(self.spectrum),
 "spectrum_creation_time": self.creation_dt.timestamp(),
 "messenger_creation_time": time.time()}

return json.dumps(json_dict)

BigQuery 中成功存储此数据的表具有以下架构:

   Last modified                  Schema                 Total Rows   Total Bytes   Expiration   Time Partitioning   Clustered Fields   Labels
 ----------------- ------------------------------------ ------------ ------------- ------------ ------------------- ------------------ --------
  04 Sep 00:24:22   |- instrument: string                1277         81897474
                    |- spectrum: string
                    |- spectrum_creation_time: string
                    |- messenger_creation_time: string

现在我试图让这个系统使用嵌套的 JSON 消息,它的构造如下:

json_dict = {'timestamp': 'AUTO',
             'values': [
                        {'id': instrument + '.Time',
                         'v': time.time(),
                         't': time.time()},
                        {'id': instrument + 'Intensity',
                         'v': str(self.spectrum),
                         't': self.creation_dt.timestamp()}
                        ]}

return json.dumps(json_dict)

我正在尝试将其存储在具有以下架构的 BigQuery 表中:

   Last modified               Schema              Total Rows   Total Bytes   Expiration   Time Partitioning   Clustered Fields   Labels
 ----------------- ------------------------------ ------------ ------------- ------------ ------------------- ------------------ --------
  09 Sep 23:56:20   |- timestamp: timestamp        0            0
                    +- values: record (repeated)
                    |  +- time: record
                    |  |  |- id: string
                    |  |  |- v: string
                    |  |  |- t: timestamp
                    |  +- spectrum: record
                    |  |  |- id: string
                    |  |  |- v: string
                    |  |  |- t: timestamp

不幸的是,当我尝试这种方法时,我收到以下错误,该错误由 DataFlow 输出到 BigQuery 中的错误表。

{"errors":[{"debugInfo":"","location":"values[0].v","message":"no such field: v.","reason":"invalid"}],"index":0}
null

解决此问题的最佳方法是什么?我无法更改嵌套的 JSON 结构,因为我正在构建一个测试套件,这是必需的格式。

4

1 回答 1

0

我能够在@Miach Kornfield 的帮助下解决我的问题,他对我的原始问题发表了评论。这是我的解决方案。

我发送到 GCP 的 JSON 数据看起来像

json_dict = {"timestamp": "1631554378.2955232",
             'values': [
                        {"id":"testA.Time",
                         "v": "1631554378.2955232",
                         "t": "1631554378.2955232"},
                        {"id": "testA.Time.Intensity",
                         "v": "[1, 43, 4..]",
                         't': "1631554378.2955232"}
                         ]
            }

我的 bigquery 表的原始架构是

bigquery 的原始架构

或以文本形式

            Schema
------------------------------
 |- timestamp: timestamp
 +- values: record (repeated)
 |  +- time: record
 |  |  |- id: string
 |  |  |- v: string
 |  |  |- t: timestamp
 |  +- spectrum: record
 |  |  |- id: string
 |  |  |- v: string
 |  |  |- t: timestamp

有效的模式是

有效的模式

或以文本形式

            Schema
------------------------------
 |- timestamp: timestamp
 +- values: record (repeated)
 |  |- id: string
 |  |- v: string
 |  |- t: timestamp

通过指示值的类型是记录(重复),这意味着它是一个结构数组,结构的结构由子列指定。细节(结构的结构由子列指定)对我来说并不明显,以及为什么我在解决这个问题时遇到了这么多麻烦。我不确定是否可以使用异构模式记录(重复)。

于 2021-09-14T21:41:29.387 回答