1

在 spring-batch 中,是否支持 Elasticsearch ItemReader,使用扫描和滚动功能?我确实看到了这个扩展,但这是基于正常的 spring-data 搜索查询。有一个基于扫描和滚动功能的会很好,因为批处理作业主要需要处理大量数据。谢谢。

4

2 回答 2

1

虽然 ElasticSearch 没有“本机”ItemReader实现,但 Spring Batch 确实提供了一个RepositoryItemReader包装 Spring Data 的方法PagingAndSortingRepository。有了这个,您可以使用 Spring Data ElasticSearch 项目提供的 ElasticSearch 存储库定义。

您可以在此处阅读有关RepositoryItemReaderSpring Batch 文档的更多信息:http: //docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/item/data/RepositoryItemReader.html

您可以在此处阅读有关 Spring Data ElasticSearch 项目的更多信息:http: //docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/

于 2015-05-08T09:15:53.963 回答
1
import java.util.Iterator;

import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> {
    private ElasticsearchOperations elasticsearchOperations;
    private final SearchQuery searchQuery;
    private String scrollId;
    private int scrollTimeinMillis = 60000;
    private Class<T> type;

    public ElasticsearchItemReader(
        final ElasticsearchOperations elasticsearchOperations,
        final SearchQuery searchQuery,
        final Class<T> type
    ) {
        this.elasticsearchOperations = elasticsearchOperations;
        this.searchQuery = searchQuery;
        this.type = type;
    }

    @Override
    protected void doOpen() throws Exception {
        scrollId = elasticsearchOperations.scan(searchQuery, scrollTimeinMillis, false);
    }

    @Override
    protected Iterator<T> doPageRead() {
        return elasticsearchOperations.scroll(scrollId, scrollTimeinMillis, type).iterator();
   }
}
于 2017-11-17T19:45:01.793 回答