0

我是 ElasticsearchTemplate 的新手。我想根据我的查询从 Elasticsearch 获取 1000 个文档。我已经使用 QueryBuilder 来创建我的查询,并且它运行良好。我浏览了以下链接,其中指出可以使用扫描和滚动来实现大数据集。

链接一
链接二

我正在尝试在以下代码部分中实现此功能,我已从上面提到的链接之一复制粘贴了该代码。但我收到以下错误:

The type ResultsMapper is not generic; it cannot be parameterized with arguments <myInputDto>.

MyInputDto在我的项目中是一个带有@Document注释的类。一天结束,我只想从 Elasticsearch 检索 1000 个文档。我试图找到size参数,但我认为它不受支持。

String scrollId = esTemplate.scan(searchQuery, 1000, false);
        List<MyInputDto> sampleEntities = new ArrayList<MyInputDto>();
        boolean hasRecords = true;
        while (hasRecords) {
            Page<MyInputDto> page = esTemplate.scroll(scrollId, 5000L,
                    new ResultsMapper<MyInputDto>() {
                        @Override
                        public Page<MyInputDto> mapResults(SearchResponse response) {
                            List<MyInputDto> chunk = new ArrayList<MyInputDto>();
                            for (SearchHit searchHit : response.getHits()) {
                                if (response.getHits().getHits().length <= 0) {
                                    return null;
                                }
                                MyInputDto user = new MyInputDto();
                                user.setId(searchHit.getId());
                                user.setMessage((String) searchHit.getSource().get("message"));
                                chunk.add(user);
                            }
                            return new PageImpl<MyInputDto>(chunk);
                        }
                    });
            if (page != null) {
                sampleEntities.addAll(page.getContent());
                hasRecords = page.hasNextPage();
            } else {
                hasRecords = false;
            }
        }

这里有什么问题?有没有其他选择可以实现这一目标?如果有人能告诉我这个(代码)在后端是如何工作的,我将不胜感激。

4

1 回答 1

5

解决方案 1

如果你想使用ElasticsearchTemplate,使用起来会更简单和可读CriteriaQuery,因为它允许使用setPageable方法设置页面大小。通过滚动,您可以获得下一组数据:

CriteriaQuery criteriaQuery = new CriteriaQuery(Criteria.where("productName").is("something"));
criteriaQuery.addIndices("prods");
criteriaQuery.addTypes("prod");
criteriaQuery.setPageable(PageRequest.of(0, 1000));

ScrolledPage<TestDto> scroll = (ScrolledPage<TestDto>) esTemplate.startScroll(3000, criteriaQuery, TestDto.class);
while (scroll.hasContent()) {
    LOG.info("Next page with 1000 elem: " + scroll.getContent());
    scroll = (ScrolledPage<TestDto>) esTemplate.continueScroll(scroll.getScrollId(), 3000, TestDto.class);
}
esTemplate.clearScroll(scroll.getScrollId());

解决方案 2

如果您想使用org.elasticsearch.client.Client而不是ElasticsearchTemplate,则SearchResponse允许设置要返回的搜索命中数:

QueryBuilder prodBuilder = ...;

SearchResponse scrollResp = client.
        prepareSearch("prods")
        .setScroll(new TimeValue(60000))
        .setSize(1000)
        .setTypes("prod")
        .setQuery(prodBuilder)
        .execute().actionGet();

ObjectMapper mapper = new ObjectMapper();
List<TestDto> products = new ArrayList<>();

try {
    do {
        for (SearchHit hit : scrollResp.getHits().getHits()) {
            products.add(mapper.readValue(hit.getSourceAsString(), TestDto.class));
        }
        LOG.info("Next page with 1000 elem: " + products);
        products.clear();
        scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
                .setScroll(new TimeValue(60000))
                .execute()
                .actionGet();
    } while (scrollResp.getHits().getHits().length != 0);
} catch (IOException e) {
    LOG.error("Exception while executing query {}", e);
}
于 2017-09-05T13:29:01.530 回答