所以,是的。扩展in_tail
是要走的路。
我编写了一个新插件,它继承自NewTailInput
并使用略有不同的插件,parse_singleline
并将parse_multilines
路径添加到记录中。
比预期好很多。
2020 年 6 月 3 日更新:我已经挖掘了代码,这是我能用最少的 Ruby 来解决问题。convert_line_to_event_with_path_names
根据您的需要进行自定义,以将自定义数据添加到记录中。
module Fluent
class DirParsingTailInput < NewTailInput
Plugin.register_input('dir_parsing_tail', self)
def initialize
super
end
def receive_lines(lines, tail_watcher)
es = @receive_handler.call(lines, tail_watcher)
unless es.empty?
tag = if @tag_prefix || @tag_suffix
@tag_prefix + tail_watcher.tag + @tag_suffix
else
@tag
end
begin
router.emit_stream(tag, es)
rescue
# ignore errors. Engine shows logs and backtraces.
end
end
end
def convert_line_to_event_with_path_names(line, es, path)
begin
directory = File.basename(File.dirname(path))
filename = File.basename(path, ".*")
line.chomp! # remove \n
@parser.parse(line) { |time, record|
if time && record
if directory != "logs"
record["parent"] = directory
record["child"] = filename
else
record["parent"] = filename
end
es.add(time, record)
else
log.warn "pattern not match: #{line.inspect}"
end
}
rescue => e
log.warn line.dump, :error => e.to_s
log.debug_backtrace(e.backtrace)
end
end
def parse_singleline(lines, tail_watcher)
es = MultiEventStream.new
lines.each { |line|
convert_line_to_event_with_path_names(line, es, tail_watcher.path)
}
es
end
def parse_multilines(lines, tail_watcher)
lb = tail_watcher.line_buffer
es = MultiEventStream.new
if @parser.has_firstline?
lines.each { |line|
if @parser.firstline?(line)
if lb
convert_line_to_event_with_path_names(lb, es, tail_watcher.path)
end
lb = line
else
if lb.nil?
log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
else
lb << line
end
end
}
else
lb ||= ''
lines.each do |line|
lb << line
@parser.parse(lb) { |time, record|
if time && record
convert_line_to_event_with_path_names(lb, es, tail_watcher.path)
lb = ''
end
}
end
end
tail_watcher.line_buffer = lb
es
end
end
end