我正在尝试使用 Python 将大量数据摄取到 Elasticsearch 中。为此,我正在使用批量 API 帮助程序,并且我开发了一个看起来像这样的函数,
def __load(self, docs, index):
try:
# begin load
logging.info("Begin indexing documents")
progress = tqdm.tqdm(unit="docs", total=len(docs))
successes = 0
# load each document and update status
for ok, action in streaming_bulk(
client=self.es_client, index=index, actions=docs,
):
progress.update(1)
successes += ok
logging.info("Indexed %d/%d documents" % (successes, len(docs)))
logging.info("Data successfully loaded to " + index + " index")
return "COMPLETED", len(docs)
except:
return "FAILED", 0
这是实际摄取发生的部分,
for ok, action in streaming_bulk(
client=self.es_client, index=index, actions=docs,
):
progress.update(1)
successes += ok
现在,我的每个文档都包含大量数据(我有几个字段是大字符串),我注意到这个摄取过程非常缓慢。我正在分块摄取数据,索引 10000 个文档需要一分钟多一点的时间。
有没有更有效的方法来做到这一点?我正在努力使这个过程更快。