我有数据进入 spooldir,我正在使用水槽将其拾取并进一步转发以进行一些处理。
有些文件不是必需的,所以我在水槽中使用 igonorePattern 属性以避免被拾取。
但问题是,我收到的必需文件和非必需文件数量相同,而且我无法控制传入的数据,因此我必须接受进入 spooldir 的任何内容。
由于我有很多这些不需要的文件,我没有磁盘空间来长时间存储它们。因此,我想知道是否有办法让水槽自动删除这些文件,就像它对所有.COMPLETED
文件一样(是的,我正在删除被水槽拾取的文件)
我有数据进入 spooldir,我正在使用水槽将其拾取并进一步转发以进行一些处理。
有些文件不是必需的,所以我在水槽中使用 igonorePattern 属性以避免被拾取。
但问题是,我收到的必需文件和非必需文件数量相同,而且我无法控制传入的数据,因此我必须接受进入 spooldir 的任何内容。
由于我有很多这些不需要的文件,我没有磁盘空间来长时间存储它们。因此,我想知道是否有办法让水槽自动删除这些文件,就像它对所有.COMPLETED
文件一样(是的,我正在删除被水槽拾取的文件)
Flume Spooling Directory Source 无法删除被忽略的文件。它立即/从不只删除已处理的文件。
有三种方法可以解决这个问题。
首先,您可以明确地解决问题(使用 shell 脚本或任何其他可以找到忽略模式的文件并将其删除的小程序)。在我看来,这不是一个好方法。
其次,您可以编写自己的自定义假脱机目录源并实现 Flume 源接口。对于这种小问题,需要付出很大的努力和艰巨的挑战。
三、滥用解决方案,可以使用Morphline Interceptor。Flume 用户指南的这一部分提到了 Morphline 拦截器。此外,您可能想看看Morphline Reference
拦截器从源头获取事件,做一些处理,最后转发到你知道的通道。
如果您选择第三种解决方案,则必须使用 kite-sdk 来执行此操作。您必须使用 flume-env.sh 将 Cloudera 的 Kite Morphlines Core 依赖项添加到您的 FLUME_CLASSPATH 中,或者只需将jar添加到$APACHE_FLUME_HOME/lib
在此解决方案中,您的示例 Flume 配置将是:
a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
a1.sources.src-1.interceptors = morph
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /spool/dir
a1.sources.src-1.fileHeader = true
a1.sources.src-1.ignoredPattern = 'whatever'
a1.sources.src-1.interceptors.morph.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.src-1.interceptors.morph.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.src-1.interceptors.morph.morphlineId = morphline1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = ch-1
a1.sinks.k1.sink.directory = /roll/dir
然后你可以创建一个自定义的morphline拦截器文件作为$APACHE_FLUME_HOME/conf/morphline.conf
在这个 conf 文件中,您可以根据需要处理什么,只需注意将记录对象返回给子进程。
这也不是一个好的解决方案,但您可以编写 Java 代码来执行 Flume 事务期间的任何进程。在每个事件中,您都可以检查目录,如果您不需要该文件,您可以将其删除。(您必须确保运行 java 进程的用户在此目录中具有权限)
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**"]
commands : [
{
readJson { }
}
{
java {
imports : """
import java.io.File;
import java.io.IOException;
"""
code : """
try {
// This code from my flume agent, you may want to use it, but it is not necessary
// JsonNode rootNode = (JsonNode) record.getFirstValue(Fields.ATTACHMENT_BODY);
// You can traverse in the relevant directory
// and find the ignored pattern manually
// then you can delete it with java code
//Second part of my code
//String rootNodeStr = rootNode.toString();
//record.put("rootNodeStr", rootNodeStr.getBytes(StandardCharsets.UTF_8));
}
} catch (IOException e) {
logger.error("So sad",e);
}
return child.process(record);
"""
}
}
{
setValues {
_attachment_body : "@{rootNodeStr}"
}
}
]
}
]
我希望它会有所帮助。