1

我在服务器上使用 fluentd 来导出日志。

我的配置使用这样的东西来捕获几个日志文件:

<source>
  type tail
  path /my/path/to/file/*/*.log
</source>

可以正确跟踪不同的文件,但是,我还需要一项功能:

路径的两个通配符部分也应该添加到记录中(我们称它们为directoryand filename)。

如果in_tail插件将文件名添加到记录中,我可以编写一个格式化程序来拆分和编辑。

任何我遗漏或改写in_tail我心愿的东西是最好的方法吗?

4

1 回答 1

1

所以,是的。扩展in_tail是要走的路。

我编写了一个新插件,它继承自NewTailInput并使用略有不同的插件,parse_singleline并将parse_multilines路径添加到记录中。

比预期好很多。


2020 年 6 月 3 日更新:我已经挖掘了代码,这是我能用最少的 Ruby 来解决问题。convert_line_to_event_with_path_names根据您的需要进行自定义,以将自定义数据添加到记录中。

module Fluent
  class DirParsingTailInput < NewTailInput
    Plugin.register_input('dir_parsing_tail', self)

    def initialize
      super
    end


    def receive_lines(lines, tail_watcher)
      es = @receive_handler.call(lines, tail_watcher)
      unless es.empty?
        tag = if @tag_prefix || @tag_suffix
                @tag_prefix + tail_watcher.tag + @tag_suffix
              else
                @tag
              end
        begin
          router.emit_stream(tag, es)
        rescue
          # ignore errors. Engine shows logs and backtraces.
        end
      end
    end

    def convert_line_to_event_with_path_names(line, es, path)
      begin
        directory = File.basename(File.dirname(path))
        filename = File.basename(path, ".*")
        line.chomp!  # remove \n
        @parser.parse(line) { |time, record|
          if time && record
            if directory != "logs"
              record["parent"] = directory
              record["child"] = filename
            else
              record["parent"] = filename
            end 
            es.add(time, record)
          else
            log.warn "pattern not match: #{line.inspect}"
          end
        }
      rescue => e
        log.warn line.dump, :error => e.to_s
        log.debug_backtrace(e.backtrace)
      end
    end

    def parse_singleline(lines, tail_watcher)
      es = MultiEventStream.new
      lines.each { |line|
        convert_line_to_event_with_path_names(line, es, tail_watcher.path)
      }
      es
    end

    def parse_multilines(lines, tail_watcher)
      lb = tail_watcher.line_buffer
      es = MultiEventStream.new
      if @parser.has_firstline?
        lines.each { |line|
          if @parser.firstline?(line)
            if lb
              convert_line_to_event_with_path_names(lb, es, tail_watcher.path)
            end
            lb = line
          else
            if lb.nil?
              log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
            else
              lb << line
            end
          end
        }
      else
        lb ||= ''
        lines.each do |line|
          lb << line
          @parser.parse(lb) { |time, record|
            if time && record
              convert_line_to_event_with_path_names(lb, es, tail_watcher.path)
              lb = ''
            end
          }
        end
      end
      tail_watcher.line_buffer = lb
      es
    end
  end
end
于 2015-05-11T13:56:47.263 回答