我想将 Storm 中的文档索引到 Elasticsearch 中,但我无法将任何文档索引到 Elasticsearch 中。
在我的拓扑中,我有一个 KafkaSpout,它向一个 EsBolt 发出这样的 json {“tweetId”:1,“text”:“hello”},它是来自 elasticsearch-hadoop 库的本机 bolt,它将 Storm 元组写入 Elasticsearch(doc在这里:https ://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html )。这些是我的 EsBolt 的配置:
Map conf = new HashMap();
conf.put("es.nodes","127.0.0.1");
conf.put("es.port","9200");
conf.put("es.resource","twitter/tweet");
conf.put("es.index.auto.create","no");
conf.put("es.input.json", "true");
conf.put("es.mapping.id", "tweetId");
EsBolt elasticsearchBolt = new EsBolt("twitter/tweet", conf);
前两个配置默认具有这些值,但我选择显式设置它们。我也试过没有它们,得到相同的结果。
这就是我构建拓扑的方式:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(TWEETS_DATA_KAFKA_SPOUT_ID, kafkaSpout, kafkaSpoutParallelism)
.setNumTasks(kafkaSpoutNumberOfTasks);
builder.setBolt(ELASTICSEARCH_BOLT_ID, elasticsearchBolt, elasticsearchBoltParallelism)
.setNumTasks(elasticsearchBoltNumberOfTasks)
.shuffleGrouping(TWEETS_DATA_KAFKA_SPOUT_ID);
return builder.createTopology();
在本地运行拓扑之前,我在 Elasticsearch 中创建了“twitter”索引,并为该索引创建了映射“tweet”。如果我检索我新创建的类型的映射(curl -XGET ' http://localhost:9200/twitter/_mapping/tweet '),这就是我得到的:
{
"twitter": {
"mappings": {
"tweet": {
"properties": {
"text": {
"type": "string"
},
"tweetId": {
"type": "string"
}
}
}
}
}
}
我在本地运行拓扑,这是我在处理元组时在控制台中得到的:
Processing received message FOR 6 TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]
Emitting: elasticsearch-bolt __ack_ack [-8010897758788654352 -6240339405307942979]
TRANSFERING tuple TASK: 2 TUPLE: source: elasticsearch-bolt:6, stream: __ack_ack, id: {}, [-8010897758788654352 -6240339405307942979]
BOLT ack TASK: 6 TIME: TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]
Execute done TUPLE source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] TASK: 6 DELTA:
所以元组似乎被处理了。但是我没有在 Elasticsearch 中索引任何文档。
我想我在为 EsBolt 设置配置时做错了什么,可能缺少配置或其他东西。