更新所有文件
背景资料
我有一个用例需要更新索引中的所有文档。我的来源类似于以下内容:
{
'hits': [
{'_index': 'main-index-v2',
'_type': '_doc',
'_id': 'ID_xzeta4955029dhs82901',
'_score': 8.403202,
'_source': {'id': 'ID_xzeta4955029dhs82901',
'employee_ids': ['J98234', 'J28373', 'CH13561', 'J98823', 'J12294'],
'non_employee_ids': [],
'friends_id': ['G8667', 'J98923', 'J28373', 'H82739', 'J98823'],
'local_date': '2022/01/10',
'local': True,
...
}
我可以使用 multi_match 查询轻松搜索我的索引,但这是针对单个 ID。
def create_multi_query(ids: str, fields: list=['employee_ids', 'non_employee_ids', 'friends_id']):
return {
"query": {
"multi_match": {
"query": f"{ids}",
"fields": fields,
"operator": "or"
}
}
}
hits = es.search(index='main-index-v2', body=create_multi_query('G8667'), scroll='2m')
我想提供一个字典和字段列表作为参数来更新我的索引。
例子:
{'J1234': 'J2875', 'CH1234': 'J2879'}
字典包含 old_ids 到 new_ids。我想更新每个具有旧 ID 的字段。
我的解决方案(到目前为止)
我编写了一个简单的脚本来更新 id,但是它需要每个字段的 for 循环。脚本所做的是逐个循环遍历每个字段。如果列表中的当前项目与我们的参数“fromId”匹配,我们将“toId”附加到列表中,否则将当前项目添加到列表中并继续。然后我们将该字段设置为等于新列表。
无痛脚本示例
def result = [];
for (def item: ctx._source.employee_ids)
{
if (item == params.fromId) {
result .add(params.toId)
}
else {
result .add(item)
}} ctx._source.employee_ids= result;
def resultF = [];
for (def item: ctx._source.friends_id)
{
if (item == params.fromId) {
resultF .add(params.toId)
}
else {
resultF .add(item)
}} ctx._source.friends_id = resultF ;
这可以通过elasticsearch_dsl
库中的 UpdateByQuery 执行。
更新调用的示例。
def partial_update(es, items: dict):
assert es.ping() is True
tmp = []
for from_id, to_id in items.items():
result = execute_intermediate(from_id, to_id)
tmp.append(result)
return tmp
@retry((exceptions.ConflictError, exceptions.ConnectionError, exceptions.RequestError), value_type=dict, tries=3, delay=2, backoff=1)
def execute_intermediate(from_id, to_id):
from elasticsearch_dsl import UpdateByQuery
ubq = UpdateByQuery(
using=auth_es(),
doc_type='doc', index=settings.ES_WRITE_INDEX,
)
ubq = ubq.script(source=UPDATE_SCRIPT, lang='painless', params={'fromId': from_id, 'toId': to_id})
ubq = ubq.params(wait_for_completion=True)
res = ubq.execute().to_dict()
return res
创建一个中间函数来对单个 ID 执行更新,并使用重试装饰器进行包装。
问题
这样做需要我一个一个地遍历我的字典来执行更新。
如果我想增加我们想要更新的字段数量,我需要添加一个新的 for 循环。
问题
根据上述内容更新源中所有字段的最佳/最佳解决方案是什么?
有没有办法发送字典来查找与键匹配的所有文档,并在一次调用中使用值进行更新?