首先,我想让你们知道,我知道 ElasticSearch Scroll API 是如何工作的基本工作逻辑。要使用Scroll API,首先,我们需要使用一些滚动值(如1m )调用search方法,然后它将返回一个_scroll_id ,该 _scroll_id将用于 Scroll 的下一次连续调用,直到所有 doc 在循环内返回。但问题是我只想在多线程的基础上使用相同的进程,而不是串行的。例如:
如果我有 300000 个文档,那么我想以这种方式处理/获取文档
- 第一个线程将处理最初的100000 个文档
- 第二个线程将处理接下来的100000 个文档
- 第三个线程将处理剩余的100000 个文档
所以我的问题是,因为我没有找到任何方法来设置滚动 API 上的from值,我如何才能通过线程加快滚动过程。不以序列化方式处理文件。
我的示例 python 代码
if index_name is not None and doc_type is not None and body is not None:
es = init_es()
page = es.search(index_name,doc_type, scroll = '30s',size = 10, body = body)
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while (scroll_size > 0):
print("Scrolling...")
page = es.scroll(scroll_id=sid, scroll='30s')
# Update the scroll ID
sid = page['_scroll_id']
print("scroll id: " + sid)
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print("scroll size: " + str(scroll_size))
print("scrolled data :" )
print(page['aggregations'])