1

我正在使用 logstash 将数据从 csv 文件导入到我们的弹性搜索中。

在导入期间,我想创建一个包含其他两个字段值的新字段。这是我的导入片段:

    input {
          file {
            path => "/data/xyz/*.csv"
            start_position => "beginning"
            ignore_older => 0
            sincedb_path => "/dev/null"
          }
    }

    filter {
        if [path] =~ "csv1" {
            csv {
                  separator => ";"
                  columns =>
                  [
                    "name1",
                    "name2",
                    "name3",
                    "ID"              
                  ]
              }
                mutate {
                  add_field => {
                      "searchfield" => "%{name1} %{name2} %{name3}"
                }
            }
        }

    output {
           if [path] =~ "csv1" {
               elasticsearch {
                  hosts => "localhost"
                  index => "my_index"           
                  document_id => "%{ID}"
               }
           }
    }
}

这可以按需要工作,但是在例如 name3 为空的行上,logstash 会写入%{name3}新字段。有没有办法只添加不为空的值?

4

1 回答 1

3

name3我认为除了检查是否存在并基于此构建您的搜索字段之外别无他法。

if [name3] {
  mutate {
    id => "with-name3"
    add_field => { "searchfield" => "%{name1} %{name2} %{name3}" }
  }
} else {
  mutate {
    id => "without-name3"
    add_field => { "searchfield" => "%{name1} %{name2}" }
  }
}

或者,如果我正确理解您的问题,您显然希望将此数据发送到 Elasticsearch 并希望有一个可搜索的字段。为了避免源中的数据重复,您可以使用copy_to语句构建搜索字段。您的映射如下所示:

{
  "mappings": {
    "doc": {
      "properties": {
        "name1": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "name2": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "name3": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "searchfield": {
          "type": "text"
        }
      }
    }
  }
}

然后您可以完美地针对该字段运行查询,而不会在源代码中出现重复。

更新。基本上您的 logstash.conf 如下所示:

input {
  file {
    path => "/data/xyz/*.csv"
    start_position => "beginning"
    ignore_older => 0
    sincedb_path => "/dev/null"
  }
}

filter {
  if [path] =~ "csv1" {
    csv {
      separator => ";"
      columns => ["name1", "name2", "name3", "ID"]
    }
  }
}

output {
  if [path] =~ "csv1" {
    elasticsearch {
      hosts => "localhost"
      index => "my_index"
      document_id => "%{ID}"
    }
  }
}

然后使用以下命令创建弹性搜索索引:

PUT /my_index/
{
  "mappings": {
    "doc": {
      "properties": {
        "name1": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "name2": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "name3": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "searchfield": {
          "type": "text"
        }
      }
    }
  }
}

然后您可以按如下方式运行搜索:

GET /my_index/_search
{
  "query": {
    "match": {
      "searchfield": {
        "query": "your text"
      }
    }
  }
}
于 2018-03-14T08:18:36.657 回答