0

我正在创建端到端流,通过使用消费 Kafka 来处理通过 tealium 事件流接收的 Json 文件,从而将数据消费到 HDFS 中。

目前,我已经使用

Consume Kafka -> Evaluate Json Path -> Jolttransform Json -> Merge Content -> Evaluate Json Path -> Update attribute -> PutHDFS ->MoveHDFS

要求是将全天假脱机的 JSON 数据读取到单个文件中,引用属性 postdate(将时代转换为YYYYMMDDSS之前的时间戳)并每天读取数据以合并到单个输出文件中,最后根据与 POST_DATE 字段相关的时间戳重命名文件区分日常文件。

当前日期输出文件夹应仅包含当前日期处理文件,并且所有较早日期的已完成输出文件应移至不同文件夹。

您能否帮助我如何在 MoveHDFS 上递归搜索 hdfs 文件夹并将不等于当前日期的已完成输出文件移动到不同的文件夹中。

4

1 回答 1

0

当前流程运行良好。使用 Kafka -> 评估 Json 路径 -> Jolttransform Json -> 合并内容 -> 评估 Json 路径 -> 更新属性 -> PutHDFS ---> 创建合并文件。

完成上述流程后创建一个单独的流程以接收处理后的合并文件并使用 listhdfs->fethchdfs->updateattribute->puthdfs 重新处理它

在 listhdfs 中,设置消耗前的最小文件年龄等待时间。这将允许进程递归搜索文件,并使用 updateattribute 根据父文件夹重新创建文件夹以重新使用进程文件。

于 2019-10-24T19:52:56.647 回答