3

在这里大声呼救,当我尝试使用 logstash 将 MySQL 值转换为嵌套的 elasticsearch 字段时,出现以下错误。

{"exception"=>"expecting List or Map, found class org.logstash.bivalues.StringBiValue", "backtrace"=>["org.logstash.Accessors.newCollectionException(Accessors.java:195)"

使用以下配置文件:

input {
    jdbc {
        jdbc_driver_library => "/logstash/mysql-connector-java-5.1.42-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/data"
        jdbc_user => "username"
        jdbc_password => "password"
        statement => "SELECT id, suggestions, address_count FROM `suggestions` WHERE id <= 100"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
    }
}
filter {
  mutate {
  rename => { 'address_count' => '[suggestions][payload][count]' }
  }
}
output {
    elasticsearch {
    hosts => [
        "localhost:9200"
    ]
        index => "dev_suggestions"
        document_type => "address"
    }
}

但是,如果我将 address_count 重命名为不在我的映射中的字段,那么它工作得很好,并且它正确地将值添加为嵌套属性,我已经尝试过索引中的其他字段,而不仅仅是 answers.payloads.address_count 和我遇到了同样的问题,它仅在映射中未定义该字段时才有效。

这让我有些头疼,如果有人能帮助我解决这个问题,我将不胜感激,因为我在过去的 48 小时里一直把头撞在桌子上!

我最初假设我可以使用 MySQL 查询执行以下操作:

SELECT id, suggestion, '[suggestions][payload][count]' FROM `suggestions` WHERE id <= 100

然后我也尝试了

SELECT id, suggestion, 'suggestions.payload.count' FROM `suggestions` WHERE id <= 100

两者都未能使用后一个选项插入值,从而导致字段不能包含点的错误。

最后是映射:

{
  "mappings": {
    "address": {
      "properties": {
        "suggestions": {
          "type": "completion",
          "payloads" : true
        }
      }
    }
  }
}

感谢 Val - 以及与我一样需要使用 logstash 将 MySQL 数据转换为嵌套 Elasticsearch 对象的未来用户,这是一个使用 Logstash 5 和 Elasticsearch 2 的工作解决方案。*

input {
    jdbc {
        jdbc_driver_library => "/logstash/mysql-connector-java-5.1.42-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/data"
        jdbc_user => "username"
        jdbc_password => "password"
        statement => "SELECT addrid, suggestion, address_count FROM `suggestions` WHERE id <= 20"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
    }
}

filter {
  ruby {
       code => "
           event.set('[suggestions][input]', event.get('suggestion'))
           event.set('[suggestions][payload][address_count]', event.get('address_count'))
           event.set('[v][payload][id]', event.get('addrid'))
       "
       remove_field => [ 'suggestion', 'address_count', 'addrid' ]
  }
}

output {
    elasticsearch {
        hosts => [
            "localhost:9200"
        ]
        index => "dev_suggestions"
        document_type => "address"
    }
}
4

1 回答 1

4

我认为你需要以不同的方式进行。首先,我会将 SQL 查询中的suggestions字段重命名为其他名称,然后根据suggestions您从 SQL 查询中获得的值构建对象。

    statement => "SELECT id, suggestion, address_count FROM `suggestions` WHERE id <= 100"

然后您可以使用ruby过滤器(并删除您的过滤器mutate)来构建您的suggestions字段,如下所示:

Logstash 2.x 代码:

ruby {
     code => "
         event['suggestions']['input'] = event['suggestion']
         event['suggestions']['payload']['count'] = event['address_count']
     "
     remove_field => [ 'suggestion', 'address_count' ]
}

Logstash 5.x 代码:

ruby {
     code => "
         event.set('[suggestions][input]', event.get('suggestion'))
         event.set('[suggestions][payload][count]', event.get('address_count'))
     "
     remove_field => [ 'suggestion', 'address_count' ]
}

PS:所有这些都假设您使用的是 ES 2.x,因为该payload字段在 ES 5.x 中消失了

于 2017-08-09T04:31:18.863 回答