1
    const body = {
  query: {
    geo_shape: {
      geometry: {
        relation: 'within',
        shape: {
          type: 'polygon',
          coordinates: [$polygon],
        },
      },
    },
  },
  pit: {
    id:  "t_yxAwEPZXNyaS1wYzYtMjAxN3IxFjZxU2RBTzNyUXhTUV9XbzhHSk9IZ3cAFjhlclRmRGFLUU5TVHZKNXZReUc3SWcAAAAAAAALmpMWQkNwYmVSeGVRaHU2aDFZZExFRjZXZwEWNnFTZEFPM3JReFNRX1dvOEdKT0hndwAA",
    keep_alive: "1m",
  },
};

查询失败并在 onBody 出现 search_phase_execution_exception 没有坑查询工作正常,但需要检索超过 10000 次点击

4

2 回答 2

3

好吧,在 NodeJS ElasticSearch 的客户端中使用 PIT 并不清楚,或者至少没有很好的文档记录。您可以使用客户端创建 PIT,例如:

const pitRes = await elastic.openPointInTime({
  index: index,
  keep_alive: "1m"
});

pit_id = pitRes.body.id;

但是无法在搜索方法中使用该pit_id,并且没有正确记录:S

但是,您可以按如下方式使用滚动 API:

const scrollSearch =  await elastic.helpers.scrollSearch({
index: index,
body: {
  "size": 10000,
  "query": {
    "query_string": {
      "fields": [ "vm_ref", "org", "vm" ],
      "query": organization + moreQuery
    },
  "sort": [
    { "utc_date": "desc" }
  ]
  }
}});

然后读取结果如下:

let res = [];

try {
  for await (const result of scrollSearch) {
    res.push(...result.body.hits.hits);
  }
} catch (e) {
  console.log(e);
}

我知道这不是您问题的确切答案,但我希望它有所帮助;)

于 2021-05-07T11:05:33.210 回答
1

现在在 ElasticSearch 中记录了使用时间点对搜索结果进行分页。您可以在这里找到或多或少的详细解释:Paginate search results

我准备了一个示例,可以提供有关如何实现工作流的想法,文档中对此进行了描述:

async function searchWithPointInTime(cluster, index, chunkSize, keepAlive) {
    if (!chunkSize) {
        chunkSize = 5000;
    }
    if (!keepAlive) {
        keepAlive = "1m";
    }

    const client = new Client({ node: cluster });
    let pointInTimeId = null;
    let searchAfter = null;

    try {
        // Open point in time
        pointInTimeId = (await client.openPointInTime({ index, keep_alive: keepAlive })).body.id;

        // Query next chunk of data
        while (true) {
            const size = remained === null ? chunkSize : Math.min(remained, chunkSize);
            const response = await client.search({
                // Pay attention: no index here (because it will come from the point-in-time)
                body: {
                    size: chunkSize,
                    track_total_hits: false, // This will make query faster
                    query: {
                        // (1) TODO: put any filter you need here (instead of match_all)
                        match_all: {},
                    },
                    pit: {
                        id: pointInTimeId,
                        keep_alive: keepAlive,
                    },
                    // Sorting should be by _shard_doc or at least include _shard_doc
                    sort: [{ _shard_doc: "desc" }],
                    // The next parameter is very important - it tells Elastic to bring us next portion
                    ...(searchAfter !== null && { search_after: [searchAfter] }),
                },
            });

            const { hits } = response.body.hits;
            if (!hits || !hits.length) {
                break; // No more data
            }

            for (hit of hits) {
                // (2) TODO: Do whatever you need with results
            }

            // Check if we done reading the data
            if (hits.length < size) {
                break; // We finished reading all data
            }

            // Get next value for the 'search after' position 
            // by extracting the _shard_doc from the sort key of the last hit
            searchAfter = hits[hits.length - 1].sort[0];
        }
    } catch (ex) {
        console.error(ex);
    } finally {
        // Close point in time
        if (pointInTime) {
            await client.closePointInTime({ body: { id: pointInTime } });
        }
    }
}
于 2021-11-08T02:47:57.683 回答