我们最近开始使用 elasticsearch 高级客户端,我们使用滚动 API 从 ES 获取大量数据。我们看到高 CPU 利用率的模式如下:
它的模式每 30 分钟重复一次。不知道发生了什么。我们在 elasticsearch 中也看到了异常——
[2021-05-12T04:19:29,516][DEBUG][oeasTransportSearchScrollAction] [node-2] [93486247] 无法执行查询阶段 org.elasticsearch.transport.RemoteTransportException: [node-3][10.160.86.222:7550] [indices:data/read/search[phase/query/scroll]] 原因:org.elasticsearch.search.SearchContextMissingException:在 org.elasticsearch.search.SearchService.getExecutor(SearchService.java 中找不到 id [93486247] 的搜索上下文:496) ~[elasticsearch-6.8.9.jar:6.8.9] at org.elasticsearch.search.SearchService.runAsync(SearchService.java:373) ~[elasticsearch-6.8.9.jar:6.8.9] at org .elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:435) ~[elasticsearch-6.8.9.jar:6.8.9] at org.elasticsearch.action.search.SearchTransportService$8.messageReceived(SearchTransportService.java:376) ~ [elasticsearch-6.8.9.jar:6.8.9] 在 org.elasticsearch。action.search.SearchTransportService$8.messageReceived(SearchTransportService.java:373) ~[elasticsearch-6.8.9.jar:6.8.9] at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler$1.doRun(SecurityServerTransportInterceptor.java :250) ~[?:?] at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[elasticsearch-6.8.9.jar:6.8.9]
正在使用的高级客户端代码是官方文档中给出的常用代码-
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
searchRequest.scroll(scroll);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (StringUtils.isNotBlank(keyword)) {
LOG.info("Searching for keyword: {}", keyword);
boolQueryBuilder.must(QueryBuilders.multiMatchQuery(keyword, INDEXED_FIELDS));
}
if(StringUtils.isNotBlank(param1)) {
boolQueryBuilder.filter(QueryBuilders.termQuery("param1", param1));
}
if(Objects.nonNull(param1)) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("param1", param1));
}
if(Objects.nonNull(param1)) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("param1", param1));
}
if(Objects.nonNull(param1)) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("param1", param1));
}
if(Objects.nonNull(param1)) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("param1", param1));
}
searchSourceBuilder.query(boolQueryBuilder);
searchRequest.source(searchSourceBuilder);
List<Object1> statuses = new ArrayList<>();
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
while (searchHits != null && searchHits.length > 0) {
for (SearchHit hit : searchHits) {
Object1 agent = JsonUtil.parseJson(hit.getSourceAsString(),
Object1.class);
statuses.add(agent);
}
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = searchResponse.getScrollId();
searchHits = searchResponse.getHits().getHits();
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
boolean succeeded = clearScrollResponse.isSucceeded();