更新所有文件
背景资料
我有一个用例需要更新索引中的所有文档。我的来源类似于以下内容:
{
  'hits': [
   {'_index': 'main-index-v2',
    '_type': '_doc',
    '_id': 'ID_xzeta4955029dhs82901',
    '_score': 8.403202,
    '_source': {'id': 'ID_xzeta4955029dhs82901',
        'employee_ids': ['J98234', 'J28373', 'CH13561', 'J98823', 'J12294'],
        'non_employee_ids': [],
        'friends_id': ['G8667', 'J98923', 'J28373', 'H82739', 'J98823'],
        'local_date': '2022/01/10',
        'local': True,
    ...
} 
我可以使用 multi_match 查询轻松搜索我的索引,但这是针对单个 ID。
def create_multi_query(ids: str, fields: list=['employee_ids', 'non_employee_ids', 'friends_id']):
    return {
        "query": {
            "multi_match": {
                "query": f"{ids}",
                "fields": fields,
                "operator": "or"
            }
        }
    }
hits = es.search(index='main-index-v2', body=create_multi_query('G8667'), scroll='2m')
我想提供一个字典和字段列表作为参数来更新我的索引。
例子:
{'J1234': 'J2875', 'CH1234': 'J2879'}
字典包含 old_ids 到 new_ids。我想更新每个具有旧 ID 的字段。
我的解决方案(到目前为止)
我编写了一个简单的脚本来更新 id,但是它需要每个字段的 for 循环。脚本所做的是逐个循环遍历每个字段。如果列表中的当前项目与我们的参数“fromId”匹配,我们将“toId”附加到列表中,否则将当前项目添加到列表中并继续。然后我们将该字段设置为等于新列表。
无痛脚本示例
def result = [];
for (def item: ctx._source.employee_ids) 
    { 
        if (item == params.fromId) {
        result .add(params.toId)
    } 
    else {
        result .add(item)
    }} ctx._source.employee_ids= result; 
def resultF = [];
for (def item: ctx._source.friends_id) 
    { 
        if (item == params.fromId) {
        resultF .add(params.toId)
    } 
    else {
        resultF .add(item)
    }} ctx._source.friends_id = resultF ; 
这可以通过elasticsearch_dsl库中的 UpdateByQuery 执行。
更新调用的示例。
def partial_update(es, items: dict):
    assert es.ping() is True
    tmp = []
    for from_id, to_id in items.items():
        result = execute_intermediate(from_id, to_id)
        tmp.append(result)
    return tmp
@retry((exceptions.ConflictError, exceptions.ConnectionError, exceptions.RequestError), value_type=dict, tries=3, delay=2, backoff=1)
def execute_intermediate(from_id, to_id):
    from elasticsearch_dsl  import UpdateByQuery
    ubq = UpdateByQuery(
        using=auth_es(),
        doc_type='doc', index=settings.ES_WRITE_INDEX,
    )
    ubq = ubq.script(source=UPDATE_SCRIPT, lang='painless', params={'fromId': from_id, 'toId': to_id})
    ubq = ubq.params(wait_for_completion=True)
    res = ubq.execute().to_dict()
    return res
创建一个中间函数来对单个 ID 执行更新,并使用重试装饰器进行包装。
问题
- 这样做需要我一个一个地遍历我的字典来执行更新。 
- 如果我想增加我们想要更新的字段数量,我需要添加一个新的 for 循环。 
问题
根据上述内容更新源中所有字段的最佳/最佳解决方案是什么?
有没有办法发送字典来查找与键匹配的所有文档,并在一次调用中使用值进行更新?