77

我必须在 ElasticSearch 中存储一些消息,并与我的 python 程序集成。现在我尝试存储的消息是:

d={"message":"this is message"}
    for index_nr in range(1,5):
        ElasticSearchAPI.addToIndex(index_nr, d)
        print d

这意味着如果我有 10 条消息,那么我必须重复我的代码 10 次。所以我想做的是尝试制作一个脚本文件或批处理文件。我检查了ElasticSearch Guide,可以使用 BULK API。格式应如下所示:

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

我所做的是:

{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}

我还使用 curl 工具来存储文档。

$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json

现在我想使用我的 Python 代码将文件存储到 Elastic Search。

4

5 回答 5

144
from datetime import datetime

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

actions = [
  {
    "_index": "tickets-index",
    "_type": "tickets",
    "_id": j,
    "_source": {
        "any":"data" + str(j),
        "timestamp": datetime.now()}
  }
  for j in range(0, 10)
]

helpers.bulk(es, actions)
于 2014-01-14T12:58:58.700 回答
51

虽然@justinachen 的代码帮助我从 py-elasticsearch 开始,但在查看源代码后让我做一个简单的改进:

es = Elasticsearch()
j = 0
actions = []
while (j <= 10):
    action = {
        "_index": "tickets-index",
        "_type": "tickets",
        "_id": j,
        "_source": {
            "any":"data" + str(j),
            "timestamp": datetime.now()
            }
        }
    actions.append(action)
    j += 1

helpers.bulk(es, actions)

helpers.bulk()已经为您进行了细分。分段是指每次发送到服务器的卡盘。如果要减少已发送文档的块,请执行以下操作:helpers.bulk(es, actions, chunk_size=100)

一些方便的入门信息:

helpers.bulk()只是一个包装器,helpers.streaming_bulk但第一个接受一个列表,这使得它很方便。

helpers.streaming_bulk一直基于Elasticsearch.bulk()所以你不必担心选择什么。

所以在大多数情况下,helpers.bulk()应该是你所需要的。

于 2014-04-16T19:30:16.267 回答
41

(这个线程中提到的其他方法使用python列表进行ES更新,这在今天不是一个好的解决方案,特别是当您需要向ES添加数百万数据时)

更好的方法是使用python 生成器-处理数据而不会超出内存或在速度上做出很大影响

下面是一个实际用例的示例片段——将数据从 nginx 日志文件添加到 ES 进行分析。

def decode_nginx_log(_nginx_fd):
    for each_line in _nginx_fd:
        # Filter out the below from each log line
        remote_addr = ...
        timestamp   = ...
        ...

        # Index for elasticsearch. Typically timestamp.
        idx = ...

        es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
        es_fields_vals = (remote_addr, timestamp, url, status)

        # We return a dict holding values from each line
        es_nginx_d = dict(zip(es_fields_keys, es_fields_vals))

        # Return the row on each iteration
        yield idx, es_nginx_d   # <- Note the usage of 'yield'

def es_add_bulk(nginx_file):
    # The nginx file can be gzip or just text. Open it appropriately.
    ...

    es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}])

    # NOTE the (...) round brackets. This is for a generator.
    k = ({
            "_index": "nginx",
            "_type" : "logs",
            "_id"   : idx,
            "_source": es_nginx_d,
         } for idx, es_nginx_d in decode_nginx_log(_nginx_fd))

    helpers.bulk(es, k)

# Now, just run it.
es_add_bulk('./nginx.1.log.gz')

这个骨架演示了生成器的用法。如果需要,您甚至可以在裸机上使用它。您可以继续扩展此功能以快速满足您的需求。

Python Elasticsearch 参考这里

于 2016-08-21T21:36:02.390 回答
12

目前我能想到的有两种选择:

1. 为每个实体定义索引名称和文档类型:

es_client = Elasticsearch()

body = []
for entry in entries:
    body.append({'index': {'_index': index, '_type': 'doc', '_id': entry['id']}})
    body.append(entry)

response = es_client.bulk(body=body)

2. 使用方法提供默认索引和文档类型:

es_client = Elasticsearch()

body = []
for entry in entries:
    body.append({'index': {'_id': entry['id']}})
    body.append(entry)

response = es_client.bulk(index='my_index', doc_type='doc', body=body)

适用于:

ES版本:6.4.0

ES python库:6.3.1

于 2018-12-04T14:02:43.730 回答
0

我的工作代码

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch_dsl import connections
import pandas as pd


# initialize list of lists
data = [['tom', 10, 'NY'], ['nick', 15, 'NY'], ['juli', 14, 'NY'], ['akshay', 30, 'IND'], ['Amit', 14, 'IND']]

# Create the pandas DataFrame
df = pd.DataFrame(data, columns = ['Name', 'Age', 'Country'])

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es_client = connections.create_connection(hosts=['http://localhost:9200/'])
def doc_generator(df):
    df_iter = df.iterrows()
    for index, document in df_iter:
        yield {
                "_index": 'age_sample',
                "_type": "_doc",
                "_source": document,
            }

helpers.bulk(es_client, doc_generator(df))

#get data from elastic search
from elasticsearch_dsl import Search
s = Search(index="age_sample").query("match", Name='nick')
于 2022-01-19T09:43:56.987 回答