0

我正在使用 NiFi 使用专有处理工具(在 NiFi外部运行)来编排大型二进制文件的处理。

NiFi 将源文件放在磁盘上,我调用外部工具(使用 ExecuteScript 处理器),该工具加载二进制文件并继续生成许多较小的文件。

当外部工具完全完成后,我需要“拾取”较小(生成)文件的目录并继续通过 NiFi 处理。我需要等待,因为 [输出目录]、[文件数] 和 [处理所需时间] 是动态的。

问题:

  1. GetFile(获取目录)没有上游连接,所以我无法在处理完成后触发它。
  2. ListFile + FetchFile 组合不起作用 b/c ListFile 没有上游连接,所以 - 再次 - 我无法在处理完成后触发它。

...那么,在完成二进制处理后,我可以使用什么处理器来获取文件的目录并将它们带入 NiFi 领域?

4

2 回答 2

1

我将假设您的外部工具有办法在完成时通知 NiFi,因为即使 GetFile 或 ListFile 支持传入的流文件,您也需要它。

那么,两步过程如何...

外部工具写入 directory-1,完成后调用由 HandleHttpRequest 处理器提供的 REST API,然后转到调用“mv directory-1 directory-2”的 ExecuteScript 处理器。

ListFile 处理器一直在监视目录 2,但在执行上面的移动命令之前什么都看不到。

于 2018-08-16T16:47:49.637 回答
0

与@Bryan Bende 的回答有些一致,我最终使用ExecuteScript处理器创建了一个提供上游连接的“ListFile”处理器:

import java.nio.charset.StandardCharsets
import groovy.io.FileType
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def fetchDirectory = flowFile.getAttribute('fetchDirectory')
def listOfFiles = []
def dir = new File(fetchDirectory)
if(dir.exists()) {
   dir.eachFileRecurse (FileType.FILES) { file ->
      listOfFiles << file
   }
}
listOfFiles.each { i ->
   def newFlowFile = session.create()
   session.putAttribute(newFlowFile, 'path', i.path)
   session.putAttribute(newFlowFile, 'filename', i.getName())
   flowFiles << newFlowFile
}
session.remove(flowFile)
session.transfer(flowFiles, REL_SUCCESS)

因此,当外部工具完成时,我将块的 FlowFile 路由到上述处理器,然后我将其通过管道传输到FetchFile处理器。

于 2018-08-17T16:00:17.597 回答