1

我正在尝试解析我们的日志文件并将它们发送到 elasticsearch。问题是我们的 S3 客户端将行注入到包含回车符 (\r) 而不是换行符 (\n) 的文件中。文件输入过滤器的配置使用 '\n' 作为分隔符,与 99% 的数据一致。当我针对这些数据运行 logstash 时,它错过了我真正想要的最后一行。这是因为文件输入过滤器将“\r”字符视为普通文本而不是换行符。为了解决这个问题,我尝试使用变异过滤器将“\r”字符重写为“\n”。mutate 有效,但 Grok 仍将其视为一条大线。和 _grokparsefailure。

我的“正常”日志文件按预期行 Grok。

配置

input {
     file {
             path => "/home/pa_stg/runs/2015-12-09-cron-1449666001/run.log"
             start_position => "beginning"
             sincedb_path => "/data/logstash/sincedb"
             stat_interval => 300
             type => "spark"
     }
}
filter{
     mutate {
             gsub => ["message", "\r", "
"]
     }
     grok {
             match => {"message" => "\A%{DATE:date} %{TIME:time} %{LOGLEVEL:loglevel} %{SYSLOGPROG}%{GREEDYDATA:data}"}
             break_on_match => false
     }
}
output{
     stdout { codec => rubydebug }
}

输入

输入文件中的这个示例说明了这个问题。^M 字符是 vim 显示 '\r' 回车的方式('more' 隐藏了大部分)。我保留了这一行,因此您可以看到整个内容在 linux 和 File Plugin 中显示为单行文本。

^M[Stage 79:=======>                                               (30 + 8) / 208]^M[Stage 79:============>                                          (49 + 8) / 208]^M[Stage 79:=================>                                     (65 + 8) / 208]^M[Stage 79:=====================>                                 (83 + 8) / 208]^M[Stage 79:===========================>                          (105 + 8) / 208]^M[Stage 79:===============================>                      (122 + 8) / 208]^M[Stage 79:====================================>                 (142 + 8) / 208]^M[Stage 79:=========================================>            (161 + 8) / 208]^M[Stage 79:==============================================>       (180 + 6) / 208]^M[Stage 79:==================================================>   (195 + 3) / 208]^M[Stage 79:=====================================================>(206 + 1) / 208]^M                                                                                ^M^M[Stage 86:==============>                                        (55 + 8) / 208]^M[Stage 86:===================>                                   (75 + 8) / 208]^M[Stage 86:==========================>                           (101 + 8) / 208]^M[Stage 86:===============================>                      (123 + 8) / 208]^M[Stage 86:======================================>               (147 + 8) / 208]^M[Stage 86:============================================>         (173 + 6) / 208]^M[Stage 86:==================================================>   (193 + 3) / 208]^M[Stage 86:=====================================================>(205 + 1) / 208]^M                                                                                ^M^M[Stage 93:===================>                                   (74 + 8) / 208]^M[Stage 93:===========================>                          (104 + 8) / 208]^M[Stage 93:==================================>                   (132 + 8) / 208]^M[Stage 93:========================================>             (157 + 9) / 208]^M[Stage 93:================================================>     (186 + 6) / 208]^M[Stage 93:=====================================================>(206 + 2) / 208]^M                                                                                ^M15/12/09 13:03:46 INFO SomethingProcessor$: Something Processor completed
15/12/09 13:04:44 INFO CassandraConnector: Disconnected from Cassandra cluster: int

输出

{
       "message" => "\n[Stage 79:=======>                                               (30 + 8) / 208]\n[Stage 79:============>
                             (49 + 8) / 208]\n[Stage 79:=================>                                     (65 + 8) / 208]\n[Stage 79:===
==================>                                 (83 + 8) / 208]\n[Stage 79:===========================>                          (105 + 8
) / 208]\n[Stage 79:===============================>                      (122 + 8) / 208]\n[Stage 79:====================================>
               (142 + 8) / 208]\n[Stage 79:=========================================>            (161 + 8) / 208]\n[Stage 79:================
==============================>       (180 + 6) / 208]\n[Stage 79:==================================================>   (195 + 3) / 208]\n[St
age 79:=====================================================>(206 + 1) / 208]\n
                  \n\n[Stage 86:==============>                                        (55 + 8) / 208]\n[Stage 86:===================>
                            (75 + 8) / 208]\n[Stage 86:==========================>                           (101 + 8) / 208]\n[Stage 86:====
===========================>                      (123 + 8) / 208]\n[Stage 86:======================================>               (147 + 8)
 / 208]\n[Stage 86:============================================>         (173 + 6) / 208]\n[Stage 86:========================================
==========>   (193 + 3) / 208]\n[Stage 86:=====================================================>(205 + 1) / 208]\n
                                                     \n\n[Stage 93:===================>                                   (74 + 8) / 208]\n[S
tage 93:===========================>                          (104 + 8) / 208]\n[Stage 93:==================================>
   (132 + 8) / 208]\n[Stage 93:========================================>             (157 + 9) / 208]\n[Stage 93:============================
====================>     (186 + 6) / 208]\n[Stage 93:=====================================================>(206 + 2) / 208]\n
                                                                 \n15/12/09 13:03:46 INFO SomethingProcessor$: Something Processor com
pleted",
      "@version" => "1",
    "@timestamp" => "2015-12-09T22:16:52.898Z",
          "host" => "ip-10-252-1-225",
          "path" => "/home/something/pa_stg/runs/2015-12-09-cron-1449666001/run.log",
          "type" => "spark",
          "tags" => [
        [0] "_grokparsefailure"
    ]
}

我需要 grok 来解​​析这一行,因为它是换行符 '\n'。有人知道怎么修这个东西吗?

15/12/09 13:03:46 INFO SomethingProcessor$: Something Processor completed
4

1 回答 1

1

我相信您正在寻找的可能是多线过滤器。

https://www.elastic.co/guide/en/logstash/current/plugins-filters-multiline.html

如果我没记错的话,这个过滤器负责决定日志行是否是新行。例如,我使用它将所有不以“[INFO]”开头的行连接在一起。

    multiline {
            pattern => "^\[%{LOGLEVEL}\]"
            negate => true
            what => "previous"
    }

我希望这会有所帮助

于 2015-12-10T15:15:39.103 回答