0

更新所有文件

背景资料

我有一个用例需要更新索引中的所有文档。我的来源类似于以下内容:

{
  'hits': [
   {'_index': 'main-index-v2',
    '_type': '_doc',
    '_id': 'ID_xzeta4955029dhs82901',
    '_score': 8.403202,
    '_source': {'id': 'ID_xzeta4955029dhs82901',
        'employee_ids': ['J98234', 'J28373', 'CH13561', 'J98823', 'J12294'],
        'non_employee_ids': [],
        'friends_id': ['G8667', 'J98923', 'J28373', 'H82739', 'J98823'],
        'local_date': '2022/01/10',
        'local': True,
    ...
} 

我可以使用 multi_match 查询轻松搜索我的索引,但这是针对单个 ID。

def create_multi_query(ids: str, fields: list=['employee_ids', 'non_employee_ids', 'friends_id']):
    return {
        "query": {
            "multi_match": {
                "query": f"{ids}",
                "fields": fields,
                "operator": "or"
            }
        }
    }

hits = es.search(index='main-index-v2', body=create_multi_query('G8667'), scroll='2m')

我想提供一个字典和字段列表作为参数来更新我的索引。

例子:

{'J1234': 'J2875', 'CH1234': 'J2879'}

字典包含 old_ids 到 new_ids。我想更新每个具有旧 ID 的字段。

我的解决方案(到目前为止)

我编写了一个简单的脚本来更新 id,但是它需要每个字段的 for 循环。脚本所做的是逐个循环遍历每个字段。如果列表中的当前项目与我们的参数“fromId”匹配,我们将“toId”附加到列表中,否则将当前项目添加到列表中并继续。然后我们将该字段设置为等于新列表。

无痛脚本示例

def result = [];
for (def item: ctx._source.employee_ids) 
    { 
        if (item == params.fromId) {
        result .add(params.toId)
    } 
    else {
        result .add(item)
    }} ctx._source.employee_ids= result; 

def resultF = [];
for (def item: ctx._source.friends_id) 
    { 
        if (item == params.fromId) {
        resultF .add(params.toId)
    } 
    else {
        resultF .add(item)
    }} ctx._source.friends_id = resultF ; 

这可以通过elasticsearch_dsl库中的 UpdateByQuery 执行。

更新调用的示例。


def partial_update(es, items: dict):
    assert es.ping() is True
    tmp = []
    for from_id, to_id in items.items():
        result = execute_intermediate(from_id, to_id)
        tmp.append(result)
    return tmp

@retry((exceptions.ConflictError, exceptions.ConnectionError, exceptions.RequestError), value_type=dict, tries=3, delay=2, backoff=1)
def execute_intermediate(from_id, to_id):
    from elasticsearch_dsl  import UpdateByQuery
    ubq = UpdateByQuery(
        using=auth_es(),
        doc_type='doc', index=settings.ES_WRITE_INDEX,
    )
    ubq = ubq.script(source=UPDATE_SCRIPT, lang='painless', params={'fromId': from_id, 'toId': to_id})
    ubq = ubq.params(wait_for_completion=True)
    res = ubq.execute().to_dict()
    return res

创建一个中间函数来对单个 ID 执行更新,并使用重试装饰器进行包装。

问题

  1. 这样做需要我一个一个地遍历我的字典来执行更新。

  2. 如果我想增加我们想要更新的字段数量,我需要添加一个新的 for 循环。

问题

根据上述内容更新源中所有字段的最佳/最佳解决方案是什么?

有没有办法发送字典来查找与键匹配的所有文档,并在一次调用中使用值进行更新?

4

1 回答 1

0

对此没有开箱即用的解决方案。

对现有无痛脚本的一项改进是在原地更改数组,同时在参数中使用映射以及要更新的字段列表。

PUT /test_replace_id/
{
  "mappings": {
    "properties": {
      "employee_ids":{
        "type": "keyword"
      }
    }
  }
}

POST /test_replace_id/_doc/1
{
  "employee_ids": ["old1","old2"],
  "frieds_id": "old1"
}

POST /test_replace_id/_update/1
{
  "script": {
    "source": """
      for (t in params.targets){
        if (ctx._source[t] instanceof List){
          for (int j=0; j<ctx._source[t].length; j++){
            if (params.map.containsKey(ctx._source[t][j])) {
              ctx._source[t][j] = params.map.get(ctx._source[t][j])
            }
          }
        }else{
          if (params.map.containsKey(ctx._source[t])) {
            ctx._source[t] = params.map.get(ctx._source[t])
          }
        }
      }
    """,
    "params":{
      "targets": ["employee_ids","frieds_id"],
      "map": {"old1":"new1"}
    }
  }
}
GET /test_replace_id/_search

这允许更大的灵活性,并且不需要迭代和更新。我们现在可以一次发送整个请求。

@Tomo_M 寻求解决方案!

于 2022-01-31T19:38:19.763 回答