我们有一些 json 数据存储到 HDFS 中,我们正在尝试使用 elasticsearch-hadoop map reduce 将数据摄取到 Elasticsearch 中。
我们使用的代码很简单(下)
public class TestOneFileJob extends Configured implements Tool {
public static class Tokenizer extends MapReduceBase
implements Mapper<LongWritable, Text, LongWritable, Text> {
@Override
public void map(LongWritable arg0, Text value, OutputCollector<LongWritable, Text> output,
Reporter reporter) throws IOException {
output.collect(arg0, value);
}
}
@Override
public int run(String[] args) throws Exception {
JobConf job = new JobConf(getConf(), TestOneFileJob.class);
job.setJobName("demo.mapreduce");
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(EsOutputFormat.class);
job.setMapperClass(Tokenizer.class);
job.setSpeculativeExecution(false);
FileInputFormat.setInputPaths(job, new Path(args[1]));
job.set("es.resource.write", "{index_name}/live_tweets");
job.set("es.nodes", "els-test.css.org");
job.set("es.input.json", "yes");
job.setMapOutputValueClass(Text.class);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new TestOneFileJob(), args));
}
}
这段代码运行良好,但我们有两个问题。
第一个问题是es.resource.write
财产的价值。目前它由index_name
json 的属性提供。
如果 json 包含数组类型的属性,例如
{
"tags" : [{"tag" : "tag1"}, {"tag" : "tag2"}]
}
例如,我们如何配置es.resource.write
以采用第一个tag
值?
我们尝试使用{tags.tag}
,{tags[0].tag}
但要么没有工作。
另一个问题,如何在 tags 属性的两个值中使作业索引 json 文档?