1

我们有一些 json 数据存储到 HDFS 中,我们正在尝试使用 elasticsearch-hadoop map reduce 将数据摄取到 Elasticsearch 中。

我们使用的代码很简单(下)

public class TestOneFileJob extends Configured implements Tool {

    public static class Tokenizer extends MapReduceBase
            implements Mapper<LongWritable, Text, LongWritable, Text> {

        @Override
        public void map(LongWritable arg0, Text value, OutputCollector<LongWritable, Text> output,
                Reporter reporter) throws IOException {

            output.collect(arg0, value);
        }

    }

    @Override
    public int run(String[] args) throws Exception {

        JobConf job = new JobConf(getConf(), TestOneFileJob.class);

        job.setJobName("demo.mapreduce");
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(EsOutputFormat.class);
        job.setMapperClass(Tokenizer.class);
        job.setSpeculativeExecution(false);

        FileInputFormat.setInputPaths(job, new Path(args[1]));

        job.set("es.resource.write", "{index_name}/live_tweets");

        job.set("es.nodes", "els-test.css.org");

        job.set("es.input.json", "yes");
        job.setMapOutputValueClass(Text.class);

        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new TestOneFileJob(), args));
    }
}

这段代码运行良好,但我们有两个问题。

第一个问题是es.resource.write财产的价值。目前它由index_namejson 的属性提供。

如果 json 包含数组类型的属性,例如

{
"tags" : [{"tag" : "tag1"}, {"tag" : "tag2"}]
}

例如,我们如何配置es.resource.write以采用第一个tag值?

我们尝试使用{tags.tag}{tags[0].tag}但要么没有工作。

另一个问题,如何在 tags 属性的两个值中使作业索引 json 文档?

4

1 回答 1

0

我们通过执行以下操作解决了这两个问题

1-在运行方法中,我们将值es.resource.write如下

job.set("es.resource.write", "{tag}/live_tweets");

2- 在 map 函数中,我们使用 gson 库将 json 转换为对象

Object currentValue = gson.fromJson(jsonString, Object.class);
  • 这里的对象是POJO我们拥有的 json

3- 从对象中,我们可以提取我们想要的标签并将其值作为新属性添加到 json。

前面的步骤解决了第一个问题。关于第二个问题(如果我们希望根据标签的数量将相同的 json 存储到多个索引中),我们只需遍历 json 中的标签并更改我们添加的标签属性,然后再次将 json 传递给收集器。以下是此步骤所需的代码。

@Override
        public void map(LongWritable arg0, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter)
                throws IOException {

            List<String> tags = getTags(value.toString());

            for (String tag : tags) {

                String newJson = value.toString().replaceFirst("\\{", "{\"tag\":\""+tag+"\",");

                output.collect(arg0, new Text(newJson));
            }
        }
于 2015-12-02T06:09:40.580 回答