我有一个处于活动状态的 Amazon Elasticsearch 实例,我能够通过 Chrome 中的“Sense”连接和执行语句。但是当我尝试进行批量插入时,它会显示“超时”错误。我一直在尝试通过 Python(批量助手)和 logstash 模块,两种方式都得到相同的错误。
下面是使用的代码
import psycopg2
from elasticsearch import Elasticsearch, helpers
import time
connection = psycopg2.connect(database='dbname', user='username', password='password', host='abc.def.com', port=5432)
es = Elasticsearch('elasticsearchinstance.amazonaws.com', max_retries=3, retry_on_timeout=True, request_timeout='10m')
cursor = connection.cursor()
query = """
select column1,column2,column3 from table
"""
cursor.execute(query)
rows = cursor.fetchall()
dict_list = []
for i in range(len(rows)):
dict_list.append({'_type':'doc', '_index':'es_index', '_id':rows[i][0], 'column2':rows[i][1], 'column3':rows[i][2]})
print len(dict_list)
es.indices.delete(index='es_index', ignore=[400, 404])
time.sleep(2)
mapping = "{\"settings\" : {\"analysis\" : { \"analyzer\" : { \"my_ngram_analyzer\" : { \"tokenizer\" : \"my_ngram_tokenizer\" }},\"tokenizer\" : {\"my_ngram_tokenizer\" : {\"type\" : \"nGram\" , \"min_gram\" : \"2\" , \"max_gram\" : \"50\" }}}}, \"mappings\": { \"doc\": { \"_id\" : { \"path\" : \"id\" }, \"properties\": { \"column2\": { \"type\": \"string\", \"analyzer\": \"my_ngram_analyzer\" }, \"id\": { \"type\": \"long\" }, \"column3\": { \"type\": \"integer\" }}}}}"
es.indices.create(index='es_index', ignore=400, body=mapping)
helpers.bulk(es, dict_list)
通过 Python Bulk helper 得到的错误如下
Traceback (most recent call last):
File "D:\Python\refresh_data.py", line 21, in <module>
es.indices.delete(index='es_index', ignore=[400, 404])
File "C:\Python27\lib\site-packages\elasticsearch\client\utils.py", line 69, in _wrapped
return func(*args, params=params, **kwargs)
File "C:\Python27\lib\site-packages\elasticsearch\client\indices.py", line 198, in delete
params=params)
File "C:\Python27\lib\site-packages\elasticsearch\transport.py", line 307, in perform_request
status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
File "C:\Python27\lib\site-packages\elasticsearch\connection\http_urllib3.py", line 89, in perform_request
raise ConnectionError('N/A', str(e), e)
elasticsearch.exceptions.ConnectionError:
ConnectionError((<urllib3.connection.HTTPConnection object at 0x0000000002C91898>, u'Connection to elasticsearchinstance.amazonaws.com timed out. (connect timeout=10)'))
caused by:
ConnectTimeoutError((<urllib3.connection.HTTPConnection object at 0x0000000002C91898>, u'Connection to elasticsearchinstance.amazonaws.com timed out. (connect timeout=10)'))
Logstash 也有类似的超时错误(用于批量插入)(如果需要,将编辑和更新 logstash 的错误)。
需要帮助来解决 Amazon Elasticsearch Service 的此超时问题。
提前致谢。
编辑:
这是我在 Amazon ES 中执行批量插入时遇到的“Logstash”错误
C:\logstash-1.5.4\bin>logstash agent -f feed_load_amazon_es.conf
io/console not supported; tty will not be manipulated
←[31mFailed to install template: connect timed out {:level=>:error}←[0m
Logstash startup completed
←[31mGot error to send bulk of actions: connect timed out {:level=>:error}←[0m
←[33mFailed to flush outgoing items {:outgoing_count=>3, :exception=>"Manticore::ConnectTimeout",
:backtrace=>["C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:35:in `initialize'",
"org/jruby/RubyProc.java:271:in `call'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:70:in `call'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:245:in `call_once'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:148:in `code'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/http/manticore.rb:71:in `perform_request'",
"org/jruby/RubyProc.java:271:in `call'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/base.rb:190:in `perform_request'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/http/manticore.rb:54:in `perform_request'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/client.rb:119:in `perform_request'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-api-1.0.12/lib/elasticsearch/api/actions/bulk.rb:80:in `bulk'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch/protocol.rb:104:in `bulk'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:542:in `submit'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:566:in `flush'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/buffer.rb:219:in `buffer_flush'",
"org/jruby/RubyHash.java:1341:in `each'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/buffer.rb:216:in `buffer_flush'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:600:in `teardown'",
"org/jruby/RubyArray.java:1613:in `each'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:248:in `outputworker'",
"org/jruby/RubyArray.java:1613:in `each'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:247:in `outputworker'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:166:in `start_outputs'"], :level=>:warn}←[0m