我看到以下 API 将通过 Elasticsearch 中的查询删除 - http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
但是我想对弹性搜索批量 API 做同样的事情,即使我可以使用批量上传文档
es.bulk(body=json_batch)
我不确定如何使用用于弹性搜索的 python 批量 API 通过查询调用删除。
我看到以下 API 将通过 Elasticsearch 中的查询删除 - http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
但是我想对弹性搜索批量 API 做同样的事情,即使我可以使用批量上传文档
es.bulk(body=json_batch)
我不确定如何使用用于弹性搜索的 python 批量 API 通过查询调用删除。
elasticsearch-py
批量 API 允许您通过包含在每条记录中来批量删除记录'_op_type': 'delete'
。但是,如果您想逐个查询删除,您仍然需要进行两个查询:一个是获取要删除的记录,另一个是删除它们。
批量执行此操作的最简单方法是使用 python 模块的scan()
帮助程序,它包装了 ElasticSearch Scroll API,因此您不必跟踪_scroll_id
s。将它与bulk()
helper 一起使用作为 deprecated 的替代品delete_by_query()
:
from elasticsearch.helpers import bulk, scan
bulk_deletes = []
for result in scan(es,
query=es_query_body, # same as the search() body parameter
index=ES_INDEX,
doc_type=ES_DOC,
_source=False,
track_scores=False,
scroll='5m'):
result['_op_type'] = 'delete'
bulk_deletes.append(result)
bulk(elasticsearch, bulk_deletes)
由于_source=False
已通过,因此不会返回文档正文,因此每个结果都非常小。但是,如果你有内存限制,你可以很容易地批量处理:
BATCH_SIZE = 100000
i = 0
bulk_deletes = []
for result in scan(...):
if i == BATCH_SIZE:
bulk(elasticsearch, bulk_deletes)
bulk_deletes = []
i = 0
result['_op_type'] = 'delete'
bulk_deletes.append(result)
i += 1
bulk(elasticsearch, bulk_deletes)
看看 elasticsearch 如何弃用通过查询 API 删除。我使用绑定创建了这个 python 脚本来做同样的事情。首先定义一个 ES 连接:
import elasticsearch
es = elasticsearch.Elasticsearch(['localhost'])
现在,您可以使用它为要删除的结果创建查询。
search=es.search(
q='The Query to ES.',
index="*logstash-*",
size=10,
search_type="scan",
scroll='5m',
)
现在您可以循环滚动该查询。在我们执行此操作时生成我们的请求。
while True:
try:
# Git the next page of results.
scroll=es.scroll( scroll_id=search['_scroll_id'], scroll='5m', )
# Since scroll throws an error catch it and break the loop.
except elasticsearch.exceptions.NotFoundError:
break
# We have results initialize the bulk variable.
bulk = ""
for result in scroll['hits']['hits']:
bulk = bulk + '{ "delete" : { "_index" : "' + str(result['_index']) + '", "_type" : "' + str(result['_type']) + '", "_id" : "' + str(result['_id']) + '" } }\n'
# Finally do the deleting.
es.bulk( body=bulk )
要使用批量 API,您需要确保两件事:
我目前正在使用基于@drs 响应的脚本,但始终使用bulk()助手。它能够通过使用chunk_size
参数从迭代器创建批量作业(默认为 500,有关更多信息,请参见straming_bulk())。
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan, bulk
BULK_SIZE = 1000
def stream_items(es, query):
for e in scan(es,
query=query,
index=ES_INDEX,
doc_type=ES_DOCTYPE,
scroll='1m',
_source=False):
# There exists a parameter to avoid this del statement (`track_source`) but at my version it doesn't exists.
del e['_score']
e['_op_type'] = 'delete'
yield e
es = Elasticsearch(host='localhost')
bulk(es, stream_items(es, query), chunk_size=BULK_SIZE)
虽然在操作上等同于许多其他答案,但我发现这种方式更容易获得:
import elasticsearch
from elasticsearch.helpers import bulk
es = elasticsearch.Elasticsearch(['localhost'])
ids = [1,2,3, ...] # list of ids that will be deleted
index = "fake_name" # index where the documents are indexed
actions = ({
"_id": id,
"_op_type": "delete"
} for id in ids)
bulk(client=es, actions=actions, index=index, refresh=True)
# `refresh=True` makes the result immediately available
谢谢,这真的很有用!
我有两个建议:
使用滚动获取下一页结果时,es.scroll(scroll_id=search['_scroll_id'])
应该是_scroll_id
最后一次滚动中返回的结果,而不是搜索返回的结果。Elasticsearch 不会每次都更新滚动 ID,尤其是对于较小的请求(请参阅此讨论),因此此代码可能有效,但并非万无一失。
清除滚动很重要,因为长时间保持搜索上下文打开是有代价的。Clear Scroll API - Elasticsearch API 文档它们最终会在超时后关闭,但如果您的磁盘空间不足,例如,它可以为您省去很多麻烦。
一个简单的方法是在旅途中建立一个滚动 ID 列表(确保消除重复项!),最后清除所有内容。
es.clear_scroll(scroll_id=scroll_id_list)