1

我已经开始使用 NiFi。我正在研究一个将数据加载到 Hive 的用例。我得到一个 CSV 文件,然后我使用 SplitText 将传入的流文件拆分为多个流文件(逐个记录拆分)。然后我使用 ConvertToAvro 将拆分的 CSV 文件转换为 AVRO 文件。之后,我将 AVRO 文件放入 HDFS 的目录中,并使用 ReplaceText + PutHiveQL 处理器触发“加载数据”命令。

我按记录拆分文件记录,因为要获取分区值(因为 LOAD DATA 不支持动态分区)。流程如下所示:

GetFile (CSV) --- SplitText(分割行数:1 和标题行数:1) --- ExtractText(使用 RegEx 获取分区字段的值并分配给属性) --- ConvertToAvro(指定架构)- - PutHDFS(写入 HDFS 位置)--- ReplaceText(加载带有分区信息的数据 cmd)--- PutHiveQL

问题是,由于我一次将 CSV 文件拆分为每条记录,它会生成太多的 avro 文件。例如,如果 CSV 文件有 100 条记录,它会创建 100 个 AVRO 文件。由于我想获取分区值,因此我必须一次将它们拆分为一条记录。我想知道有什么办法,我们可以在不逐记录拆分的情况下实现这个目标。我的意思是像批处理它。我对此很陌生,所以我还无法破解它。帮我解决这个问题。

PS:如果有任何替代方法可以实现此用例,请建议我。

4

2 回答 2

1

您是否希望根据分区的值对 Avro 记录进行分组,每个唯一值一个 Avro 文件?或者您是否只需要一些 LOAD DATA 命令的分区值(并使用包含所有记录的单个 Avro 文件)?

如果是前者,那么您可能需要一个自定义处理器或 ExecuteScript,因为您需要在一个步骤中解析、分组/聚合和转换所有内容(即对于一个 CSV 文档)。如果是后者,那么您可以将流程重新排列为:

GetFile -> ConvertCSVToAvro -> PutHDFS -> ConvertAvroToJSON -> SplitJson -> EvaluateJsonPath -> ReplaceText -> PutHiveQL

此流程将整个 CSV 文件(作为单个 Avro 文件)放入 HDFS,然后进行拆分(在转换为 JSON 之后,因为我们没有 EvaluateAvroPath 处理器),获取分区值,并生成Hive DDL 语句 (LOAD DATA)。

于 2017-02-07T17:59:39.423 回答
0

如果您已将文件放置在 hive 表使用 puthdfs 处理器读取数据的位置,则无需调用 puthiveql 处理器。我对此也很陌生,但我认为您应该利用 hive 的读取模式功能。

于 2018-03-12T07:30:51.287 回答