1

我有大量存储在 HDFS 中的数据,我们希望将其编入索引Elasticsearch。琐碎的想法是使用Elasticsearch-hadoop库。

我遵循了这个视频中的概念,这是我为这项工作编写的代码。

public class TestOneFileJob extends Configured implements Tool {

    public static class Tokenizer extends MapReduceBase
            implements Mapper<LongWritable, Text, LongWritable, MapWritable> {

        private final MapWritable map = new MapWritable();

        private final Text key = new Text("test");

        @Override
        public void map(LongWritable arg0, Text value, OutputCollector<LongWritable, MapWritable> output, Reporter reporter)
                throws IOException {

            map.put(key, value);

            output.collect(arg0, map);
        }

    }

    @Override
    public int run(String[] args) throws Exception {

        JobConf job = new JobConf(getConf(), TestOneFileJob.class);

        job.setJobName("demo.mapreduce");
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(EsOutputFormat.class);
        job.setMapperClass(Tokenizer.class);
        job.setMapOutputValueClass(MapWritable.class);
        job.setSpeculativeExecution(false);

        FileInputFormat.setInputPaths(job, new Path(args[1]));

        job.set("es.resource", args[2]);
        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new TestOneFileJob(), args));
    }

}

这项工作运行良好,但整个 json 被放入一个名为testin 的字段中Elasticsearch。很明显,字段名称是这一行的关键,private final Text key = new Text("test");但我需要整个 json 字段。

这是文档在 Elasticsearch 中的显示方式。

{
    "_index": "test",
    "_type": "test",
    "_id": "AVEzNbg4XbZ07JYtWKzv",
    "_score": 1,
    "_source": {
        "test": "{\"id\":\"tag:search.twitter.com,2005:666560492832362496\",\"objectType\":\"activity\",\"actor\":{\"objectType\":\"person\",\"id\":\"id:twitter.com:2305228178\",\"link\":\"http://www.twitter.com/alert01\",\"displayName\":\"Himanshu\",\"postedTime\":\"2014-01-22T17:49:57.000Z\",\"image\":\"https://pbs.twimg.com/profile_images/468092875440275456/jJkHRnQF_normal.jpeg\",\"summary\":\"A Proud Indian ; A Nationalist ; Believe in India First\",\"links\":[{\"href\":null,\"rel\":\"me\"}],\"friendsCount\":385,\"followersCount\":2000,\"listedCount\":83,\"statusesCount\":103117,\"twitterTimeZone\":\"New Delhi\",\"verified\":false,\"utcOffset\":\"19800\",\"preferredUsername\":\"alert01\",\"languages\":[\"en-gb\"],\"favoritesCount\":10},\"verb\":\"share\",\"postedTime\":\"2015-11-17T10:16:20.000Z\",\"generator\":{\"displayName\":\"Twitter for Android\",\"link\":\"http://twitter.com/download/android\"},\"provider\":{\"objectType\":\"service\",\"displayName\":\"Twitter\",\"link\":\"http://www.twitter.com\"},\"link\":\"http://twitter.com/alert01/statuses/666560492832362496\",\"body\":\"RT @UnSubtleDesi: Raje didnt break rules bt Media hounded her for weeks demndng resignatn on \\\"moral ground\\\".A massve dynasty scam unfoldng …\",\"object\":{\"id\":\"tag:search.twitter.com,2005:666559923673653248\",\"objectType\":\"activity\",\"actor\":{\"objectType\":\"person\",\"id\":\"id:twitter.com:17741799\",\"link\":\"http://www.twitter.com/UnSubtleDesi\",\"displayName\":\"Vande Mataram\",\"postedTime\":\"2008-11-29T21:12:05.000Z\",\"image\":\"https://pbs.twimg.com/profile_images/648362451717648384/-7oGuhfN_normal.jpg\",\"summary\":\"I apologise if I end up offending u unintentionally. In all probability, it was acutely intentional. http://saffronscarf.blogspot.in\",\"links\":[{\"href\":null,\"rel\":\"me\"}],\"friendsCount\":786,\"followersCount\":25198,\"listedCount\":155,\"statusesCount\":71853,\"twitterTimeZone\":null,\"verified\":false,\"utcOffset\":null,\"preferredUsername\":\"UnSubtleDesi\",\"languages\":[\"en\"],\"favoritesCount\":21336},\"verb\":\"post\",\"postedTime\":\"2015-11-17T10:14:04.000Z\",\"generator\":{\"displayName\":\"Twitter for Android\",\"link\":\"http://twitter.com/download/android\"},\"provider\":{\"objectType\":\"service\",\"displayName\":\"Twitter\",\"link\":\"http://www.twitter.com\"},\"link\":\"http://twitter.com/UnSubtleDesi/statuses/666559923673653248\",\"body\":\"Raje didnt break rules bt Media hounded her for weeks demndng resignatn on \\\"moral ground\\\".A massve dynasty scam unfoldng here. Eerie silence\",\"object\":{\"objectType\":\"note\",\"id\":\"object:search.twitter.com,2005:666559923673653248\",\"summary\":\"Raje didnt break rules bt Media hounded her for weeks demndng resignatn on \\\"moral ground\\\".A massve dynasty scam unfoldng here. Eerie silence\",\"link\":\"http://twitter.com/UnSubtleDesi/statuses/666559923673653248\",\"postedTime\":\"2015-11-17T10:14:04.000Z\"},\"inReplyTo\":{\"link\":\"http://twitter.com/UnSubtleDesi/statuses/666554154169446400\"},\"favoritesCount\":5,\"twitter_entities\":{\"hashtags\":[],\"urls\":[],\"user_mentions\":[],\"symbols\":[]},\"twitter_filter_level\":\"low\",\"twitter_lang\":\"en\"},\"favoritesCount\":0,\"twitter_entities\":{\"hashtags\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"UnSubtleDesi\",\"name\":\"Vande Mataram\",\"id\":17741799,\"id_str\":\"17741799\",\"indices\":[3,16]}],\"symbols\":[]},\"twitter_filter_level\":\"low\",\"twitter_lang\":\"en\",\"retweetCount\":9,\"gnip\":{\"matching_rules\":[{\"tag\":\"ISIS40\"}],\"klout_score\":54,\"language\":{\"value\":\"en\"}}}"
 }
} 

一种选择是手动解析 json 并为 json 中的每个键分配字段。

还有其他选择吗?

4

1 回答 1

1

您需要将 es.input.json 设置为 true。

job.set("es.input.json","true");
job.setMapOutputValueClass(Text.class);

这将告诉 elasticsearch hadoop 数据已经是 json 格式。然后你的映射器输出应该看起来像

output.collect(NullWritable.get(), value);

其中 value 是 json 字符串。

于 2016-01-25T08:53:45.087 回答