0

我有一个 AWS IoT 规则,它将传入的 JSON 发送到 Kinesis Firehose。

来自我的 IoT 发布的 JSON 数据都集中在一行上 - 例如:

{"count":4950, "dateTime8601": "2017-03-09T17:15:28.314Z"}

管理 UI 中的 IoT“测试”部分允许您发布消息,默认为以下(注意格式化的多行 JSON):

{
  "message": "Hello from AWS IoT console"
}

我将 Firehose 流式传输到 S3,然后由 EMR 转换为柱状格式,最终由 Athena 使用。

问题是,在转换为列格式期间,Hive(特别是JSON SerDe)无法处理跨越多行的 JSON 对象。它会破坏转换,而不是转换良好的单行 JSON 记录。

我的问题是

  • 如何设置 FireHose 以忽略多行 JSON?
  • 如果不可能,如何告诉 Hive 在加载到表之前删除换行符,或者至少捕获异常并尝试继续?

在定义 Hive 表时,我已经尝试忽略格式错误的 JSON:

DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
 count int,     
 dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT  serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
 'ignore.malformed.json' = 'true',
 "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://...';

这是我进行转换的完整 HQL:

--Example of converting to OEX/columnar formats
DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
    count int,
    dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT  serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
 'ignore.malformed.json' = 'true',
 "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://bucket.me.com/raw/all-sites/';

ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='15') location 's3://bucket.me.com/raw/all-sites/2017/03/09/15';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='16') location 's3://bucket.me.com/raw/all-sites/2017/03/09/16';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='17') location 's3://bucket.me.com/raw/all-sites/2017/03/09/17';

DROP TABLE to_orc;
CREATE EXTERNAL TABLE to_orc (
      count int,
      dateTime8601 timestamp
)
STORED AS ORC
LOCATION 's3://bucket.me.com/orc'
TBLPROPERTIES ("orc.compress"="ZLIB");

INSERT OVERWRITE TABLE to_orc SELECT count,dateTime8601 FROM site_sensor_data_raw where year=2017 AND month=03 AND day=09 AND hour=15;
4

1 回答 1

3

嗯,EMR 和 Athena 上使用的默认 JSON SerDe 无法处理多行 json 记录。每条 JSON 记录都应该在一行上。

在多行 JSON 上,我从 Hive/Hadoop 甚至 Presto(在 Athean 中使用)的角度看到了两个问题

  • 给定一个文件,很明显 Hive/Hadoop 和 JSON serde 将无法识别 json 记录的结尾和开头以返回其对象表示。
  • 给定多个文件,多行 JSON 文件不像普通 /n 分隔的 JSON 文件那样可拆分。

要从 EMR/Athena 端解决这个问题,您需要根据您的数据结构编写自己的自定义 SerDe 并捕获异常等。

如何设置 FireHose 以忽略多行 JSON?

Firehose 无法忽略特定格式。它将使用其 API(PutRecord 或 PutRecordBatch)作为数据块将放入的任何内容发送到目的地。

http://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html

无论如何,AWS Firehose使用 AWS Lambda 提供数据转换,您可以使用 Lambda 函数在 Firehose 上转换您的数据传入数据并将转换后的数据发送到目的地。因此,您可以使用该功能预先识别和展平多行 JSON。如果记录格式不正确,您也可能会删除记录等。您将需要探索 IOT 如何将多行 json 数据发送到 firehose(如逐行等)以编写自己的函数。

https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/

如果不可能,如何告诉 Hive 在加载到表之前删除换行符,或者至少捕获异常并尝试继续?

如果您的 firehose 目的地中仍有多行 JSON,由于您的 ETL 中有 EMR,您可以使用其计算而不是 Lambda 来展平 JSON。spark上的这个特性也可以帮助你。 https://issues.apache.org/jira/browse/SPARK-18352

然后,您可以摄取这些数据以创建列格式,供 Athena 处理。

于 2017-05-10T21:02:40.053 回答