0

我正在使用 Elasticsearch DSL,我正在尝试将查询结果用作另一个查询的参数,如下所示:

{
  "query": {
    "bool": {
      "must_not": {
        "terms": {
          "request_id": {
            "query": {
              "match": {
                "processing.message": "OUT Followup Synthesis"
              }
            },
            "fields": [
              "request_id"
            ],
            "_source": false
          }
        }
      }
    }
  }
}

正如您在上面看到的,我正在尝试搜索它们request_id不是request_idswith processing.messageequals to之一的来源OUT Followup Synthesis

我收到此查询错误:

Error loading data [x_content_parse_exception] [1:1660] [terms_lookup] unknown field [query]

如何使用 Elasticsearch DSL 实现我的目标?

4

1 回答 1

1

从评论中提取的原始问题

我正在尝试获取 processing.message 等于“IN Followup Sythesis”的数据,其 request_id 未出现在 processing.message 等于“OUT Followup Sythesis”的数据中。在 SQL 语言中:

SELECT d FROM   data d
WHERE  d.processing.message = 'IN Followup Sythesis'
       AND d.request_id NOT IN (SELECT request_id FROM data WHERE processing.message = 'OUT Followup Sythesis'); 

答:一般来说,Elasticsearch既不支持应用端的连接,也不支持子查询。

因此,您必须运行第一个查询,获取检索到的 ID 并将它们放入第二个查询 - 理想情况下是一个termsquery


当然,这个限制可以通过“劫持”脚本化的度量聚合来克服。

以这3个文件为例:

POST reqs/_doc
{"request_id":"abc","processing":{"message":"OUT Followup Synthesis"}}

POST reqs/_doc
{"request_id":"abc","processing":{"message":"IN Followup Sythesis"}}

POST reqs/_doc
{"request_id":"xyz","processing":{"message":"IN Followup Sythesis"}}

你可以跑

POST reqs/_search
{
  "size": 0,
  "query": {
    "match": {
      "processing.message": "IN Followup Sythesis"
    }
  },
  "aggs": {
    "subquery_mock": {
      "scripted_metric": {
        "params": {
          "disallowed_msg": "OUT Followup Synthesis"
        }, 
        "init_script": "state.by_request_ids = [:]; state.disallowed_request_ids = [];",
        "map_script": """
          def req_id = params._source.request_id;
          def msg = params._source.processing.message;
          
          if (msg.contains(params.disallowed_msg)) {
            state.disallowed_request_ids.add(req_id);
            // won't need this particular doc so continue looping
            return;
          }
          
          if (state.by_request_ids.containsKey(req_id)) {
            // there may be multiple docs under the same ID
            // so concatenate them
            state.by_request_ids[req_id].add(params._source);
          } else {
            // initialize an appendable arraylist
            state.by_request_ids[req_id] = [params._source];
          }
        """,
        "combine_script": """
          state.by_request_ids.entrySet()
            .removeIf(entry -> state.disallowed_request_ids.contains(entry.getKey()));
          return state.by_request_ids
        """,
        "reduce_script": "return states"
      }
    }
  }
}

这只会返回正确的请求:

"aggregations" : {
  "subquery_mock" : {
    "value" : [
      {
        "xyz" : [
          {
            "processing" : { "message" : "IN Followup Sythesis" },
            "request_id" : "xyz"
          }
        ]
      }
    ]
  }
}

⚠️ 这几乎可以保证速度很慢,并且违背了不访问该_source字段的建议指导。但这也表明可以“模拟”子查询。

I'd recommend to test this script on a smaller set of documents before letting it target your whole index — maybe restrict it through a date range query or similar.


FYI Elasticsearch exposes an SQL API, though it's only offered through X-Pack, a paid offering.

于 2021-02-18T22:35:46.523 回答