2

首先,我想让你们知道,我知道 ElasticSearch Scroll API 是如何工作的基本工作逻辑。要使用Scroll API,首先,我们需要使用一些滚动值(如1m )调用search方法,然后它将返回一个_scroll_id ,该 _scroll_id将用于 Scroll 的下一次连续调用,直到所有 doc 在循环内返回。但问题是我只想在多线程的基础上使用相同的进程,而不是串行的。例如:

如果我有 300000 个文档,那么我想以这种方式处理/获取文档

  • 第一个线程将处理最初的100000 个文档
  • 第二个线程将处理接下来的100000 个文档
  • 第三个线程将处理剩余的100000 个文档

所以我的问题是,因为我没有找到任何方法来设置滚动 API 上的from值,我如何才能通过线程加快滚动过程。不以序列化方式处理文件。

我的示例 python 代码

if index_name is not None and doc_type is not None and body is not None:
   es = init_es()
   page = es.search(index_name,doc_type, scroll = '30s',size = 10, body = body)
   sid = page['_scroll_id']
   scroll_size = page['hits']['total']

   # Start scrolling
   while (scroll_size > 0):

       print("Scrolling...")
       page = es.scroll(scroll_id=sid, scroll='30s')
       # Update the scroll ID
       sid = page['_scroll_id']

       print("scroll id: " + sid)

       # Get the number of results that we returned in the last scroll
       scroll_size = len(page['hits']['hits'])
       print("scroll size: " + str(scroll_size))

       print("scrolled data :" )
       print(page['aggregations'])
4

4 回答 4

7

你试过切片卷轴吗?根据链接的文档:

对于返回大量文档的滚动查询,可以将滚动拆分为多个切片,这些切片可以独立使用。

每个滚动都是独立的,可以像任何滚动请求一样并行处理。

我自己没有使用过这个(我需要处理的最大结果集是大约 50k 个文档),但这似乎是您正在寻找的。

于 2018-05-16T17:30:35.707 回答
3

您应该为此使用切片滚动,请参阅https://github.com/elastic/elasticsearch-dsl-py/issues/817#issuecomment-372271460了解如何在 python 中执行此操作。

于 2018-05-17T13:57:43.223 回答
1

我遇到了和你一样的问题,但是文档大小是 140 万。我不得不使用并发方法并使用 10 个线程进行数据写入。

我用Java线程池写了代码,你可以在Python中找到类似的方法。

    public class ControllerRunnable implements  Runnable {
        private String i_res;
        private String i_scroll_id;
        private int i_index;
        private JSONArray i_hits;
        private JSONObject i_result;

        ControllerRunnable(int index_copy, String _scroll_id_copy) {
            i_index = index_copy;
            i_scroll_id = _scroll_id_copy;
        }

        @Override
        public void run(){
            try {
                s_logger.debug("index:{}", i_index );
                String nexturl = m_scrollUrl.replace("--", i_scroll_id);
                s_logger.debug("nexturl:{}", nexturl);
                i_res = get(nexturl);

                s_logger.debug("i_res:{}", i_res);

                i_result = JSONObject.parseObject(i_res);
                if (i_result == null) {
                    s_logger.info("controller thread parsed result object NULL, res:{}", i_res);
                    s_counter++;
                    return;
                }
                i_scroll_id = (String) i_result.get("_scroll_id");
                i_hits = i_result.getJSONObject("hits").getJSONArray("hits");
                s_logger.debug("hits content:{}\n", i_hits.toString());

                s_logger.info("hits_size:{}", i_hits.size());

                if (i_hits.size() > 0) {
                    int per_thread_data_num = i_hits.size() / s_threadnumber;
                    for (int i = 0; i < s_threadnumber; i++) {
                        Runnable worker = new DataRunnable(i * per_thread_data_num,
                                (i + 1) * per_thread_data_num);
                        m_executor.execute(worker);
                    }
                    // Wait until all threads are finish
                    m_executor.awaitTermination(1, TimeUnit.SECONDS);
                } else {
                    s_counter++;
                    return;
                }
            } catch (Exception e) {
                s_logger.error(e.getMessage(),e);
            }
        }
    }
于 2020-01-04T06:51:02.807 回答
0

滚动必须是同步的,这就是逻辑。

您可以使用多线程,这正是 elasticsearch 擅长的原因:并行性。

一个弹性搜索索引,由分片组成,这是你数据的物理存储。分片可以在同一个节点上,也可以不在(更好)。

另一方面,搜索 API 提供了一个非常好的选项_preference:(https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-preference.html

所以回到你的应用程序:

  1. 获取索引分片(和节点)的列表
  2. 按分片创建线程
  3. 在每个线程上进行滚动搜索

瞧!

此外,您可以使用 elasticsearch4hadoop 插件,该插件完全适用于 Spark / PIG / map-reduce / Hive。

于 2018-05-16T17:33:17.773 回答