1

我正在从 SFTP 站点读取 CSV 文件并使用 Nifi 将其加载到 mysql db。

我有以下工作流程,似乎工作正常。在开始加载数据之前,我只需要一些帮助来弄清楚如何截断表。

尼菲流:

ListSFTP -> FetchSFTP -> InferAvroSchema -> ConvertCSVtoAvro -> ConvertAvrotoJSON -> SplitJSON -> ConvertJSONtoSQL -> PutSQL

这个流程似乎工作正常,但是每次我运行它时,我都需要先截断表,然后再开始加载。

有人可以帮我提供一些关于我如何实现这一目标的信息。或者有没有比我写的更好的流程,请指教。

谢谢,阿迪尔

4

1 回答 1

3

在进入“插入前截断”部分之前,我建议使用PutDatabaseRecord替换从 ConvertCSVtoAvro 到 PutSQL 的所有内容。它基本上一起执行“ConvertXtoSQL->PutSQL”,并减轻了所有这些转换的需要,只是为了执行 SQL 语句。

插入部分之前的截断有点棘手。从通过NIFI-4522的 NiFi 1.5.0(在撰写本文时尚未发布)开始,您将能够在执行 SQL 命令时保留流文件内容。这意味着在 PutDatabaseRecord 之前,您可以将 PutSQL 的“SQL 语句”设置为“TRUNCATE myTable”。PutSQL 将发出 TRUNCATE 但传递流文件。

目前我能想到的唯一解决方法是 ExecuteScript 处理器。如果使用 PutDatabaseRecord,您可能必须按名称获取 DBCPConnectionPool(有关示例,请参阅我的博客文章)并自己执行 TRUNCATE 语句,然后传递传入的流文件。如果将上面的流程与 PutSQL 一起使用,您可以获得传入的流文件,然后仅当 fragment.index 为零并且您在连接上使用优先级时才写出“TRUNCATE myTable”流文件,然后传输传入的流文件。

于 2017-12-29T15:59:38.157 回答