3

我想将数据从 S3 目录复制到 Amazon ElasticSearch 服务。我已经尝试按照指南进行操作,但不幸的是我正在寻找的部分丢失了。我不知道 lambda 函数本身应该是什么样子(指南中有关此的所有信息都是:“将您的应用程序源代码放在 eslambda 文件夹中。”)。我希望 ES 自动索引文件。

目前我正在尝试

for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = urllib.unquote_plus(record['s3']['object']['key'])
    index_name = event.get('index_name', key.split('/')[0])
    object = s3_client.Object(bucket, key)

    data = object.get()['Body'].read()

    helpers.bulk(es, data, chunk_size=100)

但我得到了一个巨大的错误说明 elasticsearch.exceptions.RequestError: TransportError(400, u'action_request_validation_exception', u'Validation Failed: 1: index is missing;2: type is missing;3: index is missing;4: type is missing;5: index is missing;6: type is missing;7: ...

谁能向我解释一下,我该如何设置,以便我的数据从 S3 移动到 ES 并自动映射和自动索引?显然这是可能的,如参考herehere中所述。

4

2 回答 2

0

虽然可以在 Elasticsearch 中自动分配映射,但不会自动生成索引。您必须在 POST 请求中指定索引名称和类型。如果该索引不存在,则 Elasticsearch 将自动创建索引。

根据您的错误,您似乎没有通过索引和类型。

例如,下面是一个简单的 POST 请求如何将记录添加到索引MyIndex和类型MyType,如果它不存在,它将首先创建索引和类型。

curl -XPOST 'example.com:9200/MyIndex/MyType/' \ 
    -d '{"name":"john", "tags" : ["red", "blue"]}'
于 2017-09-30T21:58:10.107 回答
0

我编写了一个脚本来从 S3 下载一个 csv 文件,然后将数据传输到 ES。

  1. 使用 boto3 制作 S3 客户端并从 S3 下载文件
  2. 制作了一个 ES 客户端来连接到 Elasticsearch。
  3. 打开 csv 文件并使用 elasticsearch 中的 helpers 模块将 csv 文件内容插入到弹性搜索中。

主文件

import boto3
from elasticsearch import helpers, Elasticsearch
import csv
import os
from config import *


#S3
Downloaded_Filename=os.path.basename(Prefix)
s3 = boto3.client('s3', aws_access_key_id=awsaccesskey,aws_secret_access_key=awssecretkey,region_name=awsregion)
s3.download_file(Bucket,Prefix,Downloaded_Filename)

#ES
ES_index = Downloaded_Filename.split(".")[0]
ES_client = Elasticsearch([ES_host],http_auth=(ES_user, ES_password),port=ES_port)

#S3 to ES
with open(Downloaded_Filename) as f:
    reader = csv.DictReader(f)
    helpers.bulk(ES_client, reader, index=ES_index, doc_type='my-type')

配置文件

awsaccesskey = ""
awssecretkey = ""
awsregion = "us-east-1"
Bucket=""
Prefix=''
ES_host = "localhost"
ES_port = "9200"
ES_user = "elastic"
ES_password = "changeme"
于 2021-01-06T14:29:14.180 回答