2

我在 ElasticSearch 中有数百万条记录。今天,我意识到有一些记录重复。有没有办法删除这些重复的记录?

这是我的查询。

  {
  "query": {
        "filtered":{    
            "query" : {
                "bool": {"must":[ 
                        {"match": { "sensorId":  "14FA084408" }},
                  {"match": { "variableName":  "FORWARD_FLOW" }}
                  ]
                    }
            },  
            "filter": {
                "range": { "timestamp": { "gt" : "2015-07-04",
                                             "lt" : "2015-07-06" }}
            }
        }
    }
}

这就是我从中得到的。

{
"took": 2,
"timed_out": false,
"_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
},
"hits": {
    "total": 21,
    "max_score": 8.272615,
    "hits": [
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxVcMpd7AZtvmZcK",
            "_score": 8.272615,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxVnMpd7AZtvmZcL",
            "_score": 8.272615,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxV6Mpd7AZtvmZcN",
            "_score": 8.0957,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxWOMpd7AZtvmZcP",
            "_score": 8.0957,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxW8Mpd7AZtvmZcT",
            "_score": 8.0957,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxXFMpd7AZtvmZcU",
            "_score": 8.0957,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxXbMpd7AZtvmZcW",
            "_score": 8.0957,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxUtMpd7AZtvmZcG",
            "_score": 8.077545,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxXPMpd7AZtvmZcV",
            "_score": 8.077545,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        },
        {
            "_index": "iotsens-summarizedmeasures",
            "_type": "summarizedmeasure",
            "_id": "AU5isxUZMpd7AZtvmZcE",
            "_score": 7.9553676,
            "_source": {
                "id": null,
                "sensorId": "14FA084408",
                "variableName": "FORWARD_FLOW",
                "rawValue": "0.2",
                "value": "0.2",
                "timestamp": 1436047200000,
                "summaryTimeUnit": "DAYS"
            }
        }
    ]
}

}

如您所见,我在同一天有 21 条重复记录。我怎样才能删除重复的记录,每天只保留一个?谢谢。

4

3 回答 3

2

做一个计数(为此使用计数 API),然后使用查询删除,查询大小比计数小一。(使用查询删除 + From/Size API 来获取)

计数 API

来自/大小 API

通过查询 API 删除

在这种情况下,您应该编写查询,使其仅获取重复记录。

或者只是查询 id 并在除一个之外的所有对象上调用批量删除。但是,我想你不能这样做,因为你没有 ID。恕我直言,我没有看到任何其他聪明的方法来做到这一点。

于 2015-07-07T10:22:57.857 回答
1

使用聚合查询,您可以在 ES 索引中找到重复的字段:

例如,查找 3 个在字段中具有相同值的文档Uuid(并为每个返回最多 5 个重复的文档Uuid):

curl -XPOST http://localhost:9200/logstash-2017.03.17/_search -d '
 {
  "size": 0,
  "aggs": {
    "duplicateCount": {
      "terms": {
        "field": "Uuid",
        "min_doc_count": 2,
        "size": 3
      },
      "aggs": {
        "duplicateDocuments": {
          "top_hits": {
            "size": 5
          }
        }
      }
    }
  }
}'

从输出中,您可以轻松过滤文档_id并删除它们。与jq

cat es_response.json | jq -r '.aggregations.duplicateCount.buckets[].duplicateDocuments.hits.hits[]._id'

然后天真的方法将使用DELETE请求:

 curl -XDELETE http://localhost:9200/{index}/{document type}/{_id value}

但是,这将删除所有重复的文档,而不会在索引中留下单个唯一文档(通常见下文)。此外,单独DELETE的查询效率极低。

我写了一个es-deduplicator工具,它为每组重复的文档留下一个文档,并通过Bulk API删除其余的文档。

这样可以在几分钟内删除数千个文档:

ES query took 0:01:44.922958, retrieved 10000 unique docs
Deleted 232539 duplicates, in total 1093490. Batch processed in 0:00:07.550461, running time 0:09:03.853110
ES query took 0:01:38.117346, retrieved 10000 unique docs
Deleted 219259 duplicates, in total 1312749. Batch processed in 0:00:07.351001, running time 0:10:50.322695
ES query took 0:01:40.111385, retrieved 10000 unique docs

注意:在循环中删除文档时,在每次批量请求后刷新索引非常重要,否则下一个查询可能会返回已删除的文档。

通过设计聚合查询是近似的,很可能很少有文档被省略(取决于您拥有多少分片和节点)。对于多个节点(典型的集群设置),最好通过唯一字段再次查询(并删除额外的副本)。

于 2017-03-28T15:06:10.587 回答
0

这是一个随机的想法,可能不完全符合您的需求。这仍然是我第一次阅读您的问题时的感受。

我们如何使用任何弹性搜索客户端库重新索引整个数据。这样做时,让我们为每个对象(我的意思是文档)计算一个哈希码,并将其设置为文档的 id。任何具有所有字段完全相同的文档都会重新索引到相同的 id,因此一旦重新索引完成,重复就会被删除。

于 2015-07-07T15:14:39.120 回答