2

我维护一个包含大约 5 亿份文档的索引。其中,每个文档都有一个包含 1 到 10 个单词的字符串字段。我想分析每个文档中的这个字段的字数,并将结果存储到相应的文档中的字段“wordCount”中。

我知道这里有 partial_update 功能: ES documentation to partial_update

我想知道是否可以使用脚本化的 partial_update(可能带有高级 Groovy 脚本)来显着提高上述任务的速度。如果是这样,有人可以提示如何开始吗?

目前,我正在使用下面的 python 脚本,但它非常慢(就大数据而言,由于许多网络往返和有效负载大小)

#!/usr/bin/env python
#-*- coding: utf-8 -*-

import elasticsearch
from elasticsearch import helpers
import pyes
from unidecode import unidecode
from datetime import datetime


def getKeywordLength(text):
    text = text.strip()
    return text.count(" ")+1

indices = ["corpus"]

uri2 = "%s:%d" % ("http://localhost", 9200)
connection2 = pyes.ES([uri2], timeout=2000000)
es = elasticsearch.Elasticsearch(timeout=2000000)

def start():
    elasticSearchIndexName = index

    ###build search query to iterate over all records
    squery ='{"sort": [{"timestampUpdated": {"order": "asc","ignore_unmapped": true}}],"query": {"filtered": {"query": {"bool": {"should": [{"query_string": {"query": "*"}}]}}}}}'

    ###fetch a scrolling handle over all records
    items = helpers.scan(es,query=squery.encode('utf8'),index=elasticSearchIndexName,scroll='360s', size='1000', timeout=2000000)

    ###iterate over all records
    for i in items:
        try:
            indexName = i["_index"]
            timestamp = datetime.now().isoformat()
            keyword = i["_source"]["keyword"]
            i["_source"]["keywordLength"] = getKeywordLength(keyword)
            i["_source"]["timestampUpdated"] =  timestamp
            result = connection2.index(i["_source"], indexName, "items", id=i['_id'])
            print result
        except:
            start()
            return
start()
4

2 回答 2

2

当我有大量数据可以批量更新数百万个文档并且负担不起往返时,我通常会使用update-by-query 插件。原理非常简单,它允许您使用查询 DSL 运行查询,并在所有匹配的文档上运行脚本来做任何您喜欢的事情。

在你的情况下,它会是这样的:

curl -XPOST localhost:9200/corpus/update_by_query -d '{
    "query": {
        "match_all": {}
    }, 
    "script": "ctx._source.keywordLength = ctx._source.keyword.split(\" \").size() + 1; ctx._source.timestampUpdated = new Date().format(\"yyyy-MM-dd\");"
}'

另请注意,为了能够运行它,您需要在elasticsearch.yml文件中启用脚本:

# before ES 1.6
script.disable_dynamic: false

# since ES 1.6
script.inline: on
于 2015-09-05T04:02:48.583 回答
0

我只发现了一小部分有关提供给在 ElasticSearch 中运行的 Groovy 脚本的上下文的信息。

基于此,这是设置/更新两个字段的 Groovy 等效项:

ctx._source.keywordLength = ctx._source.keyword.split(' ').size()
ctx._source.timestampUpdated = new Date().format('yyyy-MM-dd')

我无法弄清楚搜索和迭代是如何发挥作用的。

也可能有所帮助。

于 2015-09-04T18:29:59.063 回答