0

我有两个索引“type1”和“type2”。我能够丰富在两个索引中包含相同“MSGID”字段的一对一文档,因为我们有数千个文档,丰富过程非常缓慢。

我尝试了批量导入的选项,但不知道如何将它应用到现有脚本中,该脚本在两个索引的 1 对 1 文档浓缩上工作正常。

谢谢你。

蟒蛇脚本-

from elasticsearch import Elasticsearch, RequestsHttpConnection, Urllib3HttpConnection, ElasticsearchException
import json
import uuid
from elasticsearch.connection.http_urllib3 import VERIFY_CERTS_DEFAULT
import requests
import subprocess
import paramiko
from requests.exceptions import HTTPError
import elasticsearch.helpers
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionTimeout
from flask import request, Flask, Response, json
from flask_cors import CORS
import pandas as pd
import string

app = Flask(__name__)
CORS(app)


es_host = "<>"
es_port = "<>"
es_uname = "<>"
es_pwd = "<>"
es_type1 = "type1"
es_type2 = "type2"


def makeConnection():
    elastic = Elasticsearch([{'host': es_host, 'port': es_port}], http_auth=(
        es_uname, es_pwd), scheme="https", verify_certs=False)
    return elastic


elastic = makeConnection()
# check index


def enrichment_data(message_id):
    data = elastic.search(index="type2", body={
        "size": 1,
        "sort": {"event_timestamp": "desc"},
        "query":
            {
                "match": {
                    "MSGID.keyword": str(message_id)
                }
        }})
    if data['hits']['total']['value']!=0:
        ABC = data['hits']['hits'][0]['_source']['ABC']
        XYZ=data['hits']['hits'][0]['_source']['XYZ']
        metadata = {"ABC": ABC,
                "XYZ": XYZ
                }
    else:
        metadata={}
        pass
    return metadata


def enrich_data():
    get_data = elastic.search(index=type1, body={
        "query": {
            "bool": {
                "must": [
                    {
                        "exists": {
                            "field": "MSGID"
                        }
                    }
                ]
            }
        }
    }
    )
#    print(get_data)
    if get_data['hits']['total']['value'] != 0:
        for data in get_data['hits']['hits']:
 #           print(data)
            message_id = data['_source']['MSGID']
  #          print(message_id)
            metadata = enrichment_data(message_id)
            try:
                data['_source']['ABC'] = metadata['ABC']
                data['_source']['XYZ'] = metadata['XYZ']


            except:
                pass

            body_json = data['_source']
            elastic.index(index=data['_index'], doc_type=data['_type'],
                          id=data['_id'], body=body_json)
            elastic.indices.refresh(index=data['_index'])
    return


if __name__ == '__main__':

    enrich_data()

注意 - 我不是在这里寻找丰富处理器作为数据丰富的选项。

4

0 回答 0