0

我有数据进入 spooldir,我正在使用水槽将其拾取并进一步转发以进行一些处理。

有些文件不是必需的,所以我在水槽中使用 igonorePattern 属性以避免被拾取。

但问题是,我收到的必需文件和非必需文件数量相同,而且我无法控制传入的数据,因此我必须接受进入 spooldir 的任何内容。

由于我有很多这些不需要的文件,我没有磁盘空间来长时间存储它们。因此,我想知道是否有办法让水槽自动删除这些文件,就像它对所有.COMPLETED文件一样(是的,我正在删除被水槽拾取的文件)

4

1 回答 1

0

Flume Spooling Directory Source 无法删除被忽略的文件。它立即/从不只删除已处理的文件。

有三种方法可以解决这个问题。

首先,您可以明确地解决问题(使用 shell 脚本或任何其他可以找到忽略模式的文件并将其删除的小程序)。在我看来,这不是一个好方法。

其次,您可以编写自己的自定义假脱机目录源并实现 Flume 源接口。对于这种小问题,需要付出很大的努力和艰巨的挑战。

三、滥用解决方案,可以使用Morphline Interceptor。Flume 用户指南的这一部分提到了 Morphline 拦截器。此外,您可能想看看Morphline Reference

拦截器从源头获取事件,做一些处理,最后转发到你知道的通道。

如果您选择第三种解决方案,则必须使用 kite-sdk 来执行此操作。您必须使用 flume-env.sh 将 Cloudera 的 Kite Morphlines Core 依赖项添加到您的 FLUME_CLASSPATH 中,或者只需jar添加$APACHE_FLUME_HOME/lib

在此解决方案中,您的示例 Flume 配置将是:

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
a1.sources.src-1.interceptors = morph

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /spool/dir
a1.sources.src-1.fileHeader = true
a1.sources.src-1.ignoredPattern = 'whatever'

a1.sources.src-1.interceptors.morph.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.src-1.interceptors.morph.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.src-1.interceptors.morph.morphlineId = morphline1

a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = ch-1
a1.sinks.k1.sink.directory = /roll/dir

然后你可以创建一个自定义的morphline拦截器文件作为$APACHE_FLUME_HOME/conf/morphline.conf

在这个 conf 文件中,您可以根据需要处理什么,只需注意将记录对象返回给子进程。

这也不是一个好的解决方案,但您可以编写 Java 代码来执行 Flume 事务期间的任何进程。在每个事件中,您都可以检查目录,如果您不需要该文件,您可以将其删除。(您必须确保运行 java 进程的用户在此目录中具有权限)

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [
      {
        readJson { }
      }
      {
        java {
          imports : """
            import java.io.File;
            import java.io.IOException;
          """
          code : """
            try {
                // This code from my flume agent, you may want to use it, but it is not necessary
                // JsonNode rootNode = (JsonNode) record.getFirstValue(Fields.ATTACHMENT_BODY);    

                // You can traverse in the relevant directory
                // and find the ignored pattern manually
                // then you can delete it with java code

                //Second part of my code
                //String rootNodeStr = rootNode.toString();
                //record.put("rootNodeStr", rootNodeStr.getBytes(StandardCharsets.UTF_8));
              }
            } catch (IOException e) {
              logger.error("So sad",e);
            }
            return child.process(record);
          """
        }
      }
      {
        setValues {
          _attachment_body : "@{rootNodeStr}"
        }
      }
    ]
  }
]

我希望它会有所帮助。

于 2018-07-26T15:15:10.433 回答