12

我正在尝试重新索引我的弹性搜索设置,目前正在查看弹性搜索文档使用 Python API 的示例

不过,我有点困惑这一切是如何运作的。我能够从 Python API 获得滚动 ID:

es = Elasticsearch("myhost")

index = "myindex"
query = {"query":{"match_all":{}}}
response = es.search(index= index, doc_type= "my-doc-type", body= query, search_type= "scan", scroll= "10m")

scroll_id = response["_scroll_id"]

现在我的问题是,这对我有什么用?知道滚动 ID 甚至能给我带来什么?文档说要使用“批量 API”,但我不知道 scoll_id 是如何影响这一点的,这有点令人困惑。

考虑到我正确获得了scroll_id,谁能给出一个简短的例子来说明我如何从这一点重新索引?

4

4 回答 4

9

这是使用 elasticsearch-py 重新索引到另一个弹性搜索节点的示例:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des)

您还可以将查询结果重新索引到不同的索引,这是如何做到的:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

body = {"query": {"term": {"year": "2004"}}}
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des, query=body)
于 2016-01-14T13:30:39.127 回答
7

您好,您可以使用滚动 API 以最有效的方式浏览所有文档。使用 scroll_id 您可以找到存储在服务器上的特定滚动请求的会话。因此,您需要为每个请求提供 scroll_id 以获得更多项目。

批量 api 用于更有效地索引文档。复制和索引时,您需要两者,但它们并不真正相关。

我确实有一些 Java 代码可以帮助您更好地了解它是如何工作的。

    public void reIndex() {
    logger.info("Start creating a new index based on the old index.");

    SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX)
            .setQuery(matchAllQuery())
            .setSearchType(SearchType.SCAN)
            .setScroll(createScrollTimeoutValue())
            .setSize(SCROLL_SIZE).execute().actionGet();

    BulkProcessor bulkProcessor = BulkProcessor.builder(client,
            createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD)
            .setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
            .setFlushInterval(createFlushIntervalTime())
            .build();

    while (true) {
        searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
                .setScroll(createScrollTimeoutValue()).execute().actionGet();

        if (searchResponse.getHits().getHits().length == 0) {
            logger.info("Closing the bulk processor");
            bulkProcessor.close();
            break; //Break condition: No hits are returned
        }

        for (SearchHit hit : searchResponse.getHits()) {
            IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id());
            request.source(hit.sourceRef());
            bulkProcessor.add(request);
        }
    }
}
于 2014-10-14T22:17:07.780 回答
5

对于遇到此问题的任何人,您可以使用 Python 客户端中的以下 API 重新索引:

https://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex

这将帮助您避免滚动和搜索以获取所有数据并使用批量 API 将数据放入新索引。

于 2015-05-14T23:57:14.153 回答
0

重新索引的最佳方法是使用 Elasticsearch 的内置重新索引API,因为它得到很好的支持并且对已知问题具有弹性。

Elasticsaerch Reindex API 批量使用滚动和批量索引,并允许对数据进行脚本转换。在 Python 中,可以开发类似的例程:

#!/usr/local/bin/python
from elasticsearch import Elasticsearch
from elasticsearch import helpers

src = Elasticsearch(['localhost:9202'])
dst = Elasticsearch(['localhost:9200'])

body = {"query": { "match_all" : {}}}

source_index='src-index'
target_index='dst-index'
scroll_time='60s'
batch_size='500'

def transform(hits):
    for h in hits:
        h['_index'] = target_index
        yield h

rs = src.search(index=[source_index],
        scroll=scroll_time,
        size=batch_size,
        body=body
   )

helpers.bulk(dst, transform(rs['hits']['hits']), chunk_size=batch_size)

while True:
    scroll_id = rs['_scroll_id']
    rs = src.scroll(scroll_id=scroll_id, scroll=scroll_time)
    if len(rs['hits']['hits']) > 0:
        helpers.bulk(dst, transform(rs['hits']['hits']), chunk_size=batch_size)
    else:
        break;
于 2019-06-17T20:12:36.867 回答