0

我正在尝试更新现有的 elasticsearch 数据管道,并希望更充分地使用 elasticsearch-dsl。在当前流程中,我们将文档创建为 json 对象,然后使用请求将对象 PUT 到相关的弹性搜索索引。

我现在想使用 elasticsearch-dsl 保存方法,但是当我的对象或文档被构造为 json 时,我很难理解如何做到这一点。

当前流程:

//import_script.py

index = 'objects'
doc = {"title": "A title", "Description": "Description", "uniqueID": "1234"}
doc_id = doc["uniqueID"]
elastic_url = 'http://elastic:changeme@localhost:9200/' + index + '/_doc/ + doc_id

api = ObjectsHandler()
api.put(elastic_url, doc)


//objects_handler.py

class ObjectsHandler():
     def put(self, url, object):
        result = requests.put(url, json=object)
        if result.status_code != requests.codes.ok:
            print(result.text)
            result.raise_for_status()

我不想使用这种 PUT 方法,而是想利用 DSL 中可用的 Document.save 功能,但我无法为我的用例翻译 api 文档中的示例。

我已经修改了我的 ObjectsHandler 以便它可以创建对象索引:

//objects_handler.py

es = Elasticsearch([{'host': 'localhost', 'port': 9200}],
                   http_auth='elastic:changeme')

connections.create_connection(es)

class Object(Document):
    physicalDescription = Text()
    title = Text()
    uniqueID = Text()

    class Index:
        name = 'objects'
        using = es

class ObjectsHandler():

   def init_mapping(self, index):
        Object.init(using=es, index=index)

api.init_mapping(index)当我从导入器脚本调用时,这成功创建了一个索引。

文档以这个作为保存单个文档的示例,其中 Article 相当于我的 Object 类:

# create and save and article
article = Article(meta={'id': 42}, title='Hello world!', tags=['test'])
article.body = ''' looong text '''
article.published_from = datetime.now()
article.save()

我是否可以使用这种方法,但保留我预先构建的 json 对象文档,而不是指定单个属性?我还需要能够指定文档 ID 是文档唯一 ID。

我扩展了我的 ObjectsHandler 以包含一个 save_doc 方法:

def save_doc(self, document, doc_id, index):
        new_obj = Object(meta={'id': doc_id}, 
                  title="hello", uniqueID=doc_id,
                  physicalDescription="blah")
        new_obj.save()

这确实成功地将具有 uniqueID 的对象保存为 id,但我无法使用传递给方法的 json 对象作为document.

4

1 回答 1

0

通过使用 elasticsearch.py​​ 批量助手而不是 elasticsearch-dsl,我在这方面取得了一些成功。以下资源非常有用:

在我的问题中,我指的是:

doc = {"title": "A title", "Description": "Description", "uniqueID": "1234"}

我实际上有一个包含 1 个或多个文档的数组或列表,例如:

documents = [{"title": "A title", "Description": "Description", "uniqueID": "1234"}, {"title": "Another title", "Description": "Another description", "uniqueID": "1235"}]

我为批量导入构建了一个主体并附加了 ID:

for document in documents:
   bulk_body.append({'index': {'_id': document["uniqueID"]}})
   bulk_body.append(document)

然后运行我对 helpers.bulk 方法的新调用:

api_handler.save_docs(bulk_body, 'objects')

我的 objects_handler.py 文件看起来像:

//objects_handler.py
from elasticsearch.helpers import bulk

es = Elasticsearch([{'host': 'localhost', 'port': 9200}],
                   http_auth='elastic:changeme')

connections.create_connection(es)

class Object(Document):
    physicalDescription = Text()
    title = Text()
    uniqueID = Text()

    class Index:
        name = 'objects'
        using = es

class ObjectsHandler():

   def init_mapping(self, index):
        Object.init(using=es, index=index)

   def save_docs(self, docs, index):
        print("Attempting to index the list of docs using helpers.bulk()")
        resp = es.bulk(index='objects', body=docs)
        print("helpers.bulk() RESPONSE:", resp)
        print("helpers.bulk() RESPONSE:", json.dumps(resp, indent=4))

这适用于 json 格式的单个文档或多个文档。

于 2020-02-12T17:07:40.673 回答