0

所以我们有一个旧的弹性搜索索引,它屈服于字段爆炸。我们重新设计了索引的结构以使用嵌套文档解决此问题。但是,我们正在尝试弄清楚如何将旧索引数据迁移到新结构中。我们目前正在考虑使用 Logstash 插件,尤其是聚合插件来尝试实现这一目标。但是,我们可以找到的所有示例都显示了如何从数据库调用创建嵌套文档,而不是从字段爆炸索引。对于上下文,以下是旧索引的示例:

"assetID": 22074,
"metadata": {
  "50": {
    "analyzed": "Phase One",
    "full": "Phase One",
    "date": "0001-01-01T00:00:00"
  },
  "51": {
    "analyzed": "H 25",
    "full": "H 25",
    "date": "0001-01-01T00:00:00"
  },
  "58": {
    "analyzed": "50",
    "full": "50",
    "date": "0001-01-01T00:00:00"
  }
}

这是我们希望转换后的数据最终的样子:

"assetID": 22074,
"metadata": [{
    "metadataId": 50,
    "ngrams": "Phase One", //This was "analyzed"
    "alphanumeric": "Phase One", //This was "full"
    "date": "0001-01-01T00:00:00"
  }, {
    "metadataId": 51,
    "ngrams": "H 25", //This was "analyzed"
    "alphanumeric": "H 25", //This was "full"
    "date": "0001-01-01T00:00:00"
  }, {
    "metadataId": 58,
    "ngrams": "50", //This was "analyzed"
    "alphanumeric": "50", //This was "full"
    "date": "0001-01-01T00:00:00"
  }
}]

作为一个简单的示例,我们可以从聚合插件中得出以下结论:

input {
  elasticsearch {
    hosts => "my.old.host.name:9266"
    index => "my-old-index"
    query => '{"query": {"bool": {"must": [{"term": {"_id": "22074"}}]}}}'  
    size => 500
    scroll => "5m"
    docinfo => true
  }
}

filter {
   aggregate {
    task_id => "%{id}"

    code => "     
      map['assetID'] = event.get('assetID')
      map['metadata'] ||= []
      map['metadata'] << {
        metadataId => ? //somehow parse the Id out of the exploded field name "metadata.#.full",
        ngrams => event.get('metadata.#.analyzed'),
        alphanumeric => event.get('metadata.#.full'),
        date => event.get('metadata.#.date'),
      }
    "
    push_previous_map_as_event => true
    timeout => 150000
    timeout_tags => ['aggregated']    
  } 

   if "aggregated" not in [tags] {
    drop {}
  }

}

output {
  elasticsearch {
    hosts => "my.new.host:9266"
    index => "my-new-index"
    document_type => "%{[@metadata][_type]}"
    document_id => "%{[@metadata][_id]}"
    action => "update"
  }

  file {
    path => "C:\apps\logstash\logstash-5.6.6\testLog.log"
  }  
}

显然,上面的示例基本上只是伪代码,但这就是我们通过查看 Logstash 和 ElasticSearch 的文档,以及聚合过滤器插件以及通常谷歌搜索它们生命中一英寸内的东西所能收集到的全部内容。

4

2 回答 2

0

您可以使用事件对象,对其进行按摩,然后将其添加到新索引中。如下所示(logstash 代码未经测试,您可能会发现一些错误。请检查本节之后的工作 ruby​​ 代码):

 aggregate {
    task_id => "%{id}"

    code => "arr = Array.new()

map["assetID"] = event.get("assetID")

metadataObj = event.get("metadata")
metadataObj.to_hash.each do |key,value| 
  transformedMetadata = {} 
  transformedMetadata["metadataId"] = key  

  value.to_hash.each do |k , v|

    if k == "analyzed" then
       transformedMetadata["ngrams"] = v
    elsif k == "full" then
       transformedMetadata["alphanumeric"] = v
    else
       transformedMetadata["date"] = v
    end
  end
  arr.push(transformedMetadata)
end
  map['metadata'] ||= []
  map['metadata'] << arr

"

  }
}

尝试根据事件输入在上面玩,你会到达那里。这是一个工作示例,您可以在问题中输入内容,供您玩耍:https ://repl.it/repls/HarshIntelligentEagle

json_data = {"assetID": 22074,
"metadata": {
  "50": {
    "analyzed": "Phase One",
    "full": "Phase One",
    "date": "0001-01-01T00:00:00"
  },
  "51": {
    "analyzed": "H 25",
    "full": "H 25",
    "date": "0001-01-01T00:00:00"
  },
  "58": {
    "analyzed": "50",
    "full": "50",
    "date": "0001-01-01T00:00:00"
  }
}
}

arr = Array.new()
transformedObj = {}
transformedObj["assetID"] = json_data[:assetID]


json_data[:metadata].to_hash.each do |key,value|  
  transformedMetadata = {}
  transformedMetadata["metadataId"] = key  
  
  value.to_hash.each do |k , v|
  
    if k == :analyzed then
       transformedMetadata["ngrams"] = v
    elsif k == :full then
       transformedMetadata["alphanumeric"] = v
    else
       transformedMetadata["date"] = v
    end
  end
  arr.push(transformedMetadata)
end
transformedObj["metadata"] = arr

puts transformedObj

于 2018-02-01T22:06:21.657 回答
0

最后,我们使用 ruby​​ 代码在脚本中解决它:

# Must use the input plugin for elasticsearch at version 4.0.2, or it cannot contact a 1.X index
input {
  elasticsearch {
    hosts => "my.old.host.name:9266"
    index => "my-old-index"
    query => '{
      "query": {
        "bool": {
          "must": [
            { "match_all": { } }
          ]
        }
      }
    }' 
    size => 500
    scroll => "5m"
    docinfo => true
  }
}

filter {
  mutate {
    remove_field => ['@version', '@timestamp']
  }
}

#metadata
filter {
  mutate {
    rename => { "[metadata]" => "[metadata_OLD]" }
  }

  ruby {
    code => "
      metadataDocs = []
      metadataFields = event.get('metadata_OLD')

      metadataFields.each { |key, value|
        metadataDoc = {
          'metadataID' => key.to_i,
          'date' => value['date']
        }

        if !value['full'].nil?
          metadataDoc[:alphanumeric] = value['full']
        end

        if !value['analyzed'].nil?
          metadataDoc[:ngrams] = value['analyzed']
        end

        metadataDocs << metadataDoc
      }

      event.set('metadata', metadataDocs)
    "
  }

  mutate {
    remove_field => ['metadata_OLD']
  }
}

output {
  elasticsearch {
    hosts => "my.new.host:9266"
    index => "my-new-index"
    document_type => "searchasset"
    document_id => "%{assetID}"
    action => "update"
    doc_as_upsert => true
  }
  file {
    path => "F:\logstash-6.1.2\logs\esMigration.log"
  }  
}
于 2018-02-05T20:09:11.990 回答