2

我对 Apache NIFI 相当陌生。

我想设置一个流程,其中有一个文件被放入“热文件夹”。如果此文件夹检测到放入其中的文件,则此文件将被放入另一个名为“输入”的文件夹中。一旦文件被复制到输入文件夹中,我想触发一个 Java 程序来运行。

我解决这个问题的方法是创建一个“GETFILE”处理器来从热文件夹中获取文件。然后创建一个 PUTFILE 处理器将其放入输入文件夹中。所以你可以想象在'GETFILE'和'PUTFILE'处理器之间有一个连接链接。这按预期工作。

然而,我面临的挑战是,当文件复制到 INPUT 文件夹时(即在 PUTFILE 处理器执行后),触发我的 Java 进程运行。我无法在 PUTFILE 和 EXECUTEPROCESS 处理器之间创建链接(作为告诉 NIFI 在文件从热文件夹复制到输入文件夹后运行 Java 进程的一种方式)。我似乎无法获得连接 PUTFILE 和 EXECUTEPROCESS 处理器之间的连接箭头(因为 NIFI 不允许我这样做)。

根据上面的描述,有没有人可以推荐一种方法来告诉 NIFI 在检测到文件被添加到输入文件夹后触发 Java 应用程序运行?

谢谢。

4

1 回答 1

6

您要做的事情很有意义,我们实际上曾经允许使用该处理器进行类似的操作。事实证明,尽管有足够多的边缘情况,决定如何处理输入流文件变得相当有问题,因此我们有一个当前非常明确的模型,这基本上意味着处理器与 cron 调度相结合是一个花哨的 cron 工具。

因此,我们改为在 NiFi 0.5.0 版本中发布,这应该在几天内发布。其中我们有https://issues.apache.org/jira/browse/NIFI-210,这是一个非常令人兴奋的功能,它允许针对内联流进行脚本编写。ExecuteScript 处理器听起来非常适合您的情况。例如,如果您运行此代码,则可以在数据存在时触发它,并可以等待侦听输出并将其捕获为流文件属性。然后,您甚至可以路由响应的内容等。

def flowFile = session.get()
if (flowFile == null) {
    return;
}
def procout = new StringBuffer(512), procerr = new StringBuffer(512)
def proc = "java -version".execute()
proc.consumeProcessOutput(procout, procerr)
proc.waitForOrKill(1000)
flowFile = session.putAttribute(flowFile, "Process Output", procout.toString())
flowFile = session.putAttribute(flowFile, "Process Error", procerr.toString())
session.transfer(flowFile, REL_SUCCESS)

如果您有更多问题,请告诉我们。

谢谢乔

于 2016-02-05T02:03:23.677 回答