我试图在 hadoop 中运行 mapreduce 程序。基本上它接受一个文本文件作为输入,其中每一行都是一个 json 文本。我使用简单的 json 在我的映射器中解析这些数据,reducer 做了一些其他的事情。我在 hadoop/lib 文件夹中包含了简单的 json jar 文件。这是下面的代码
package org.myorg;
import java.io.IOException;
import java.util.Iterator;
import java.util.*;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ALoc
{
public static class AMapper extends Mapper<Text, Text, Text, Text>
{
private Text kword = new Text();
private Text vword = new Text();
JSONParser parser = new JSONParser();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
try {
String line = value.toString();
Object obj = parser.parse(line);
JSONObject jsonObject = (JSONObject) obj;
String val = (String)jsonObject.get("m1") + "," + (String)jsonObject.get("m3");
kword.set((String)jsonObject.get("m0"));
vword.set(val);
context.write(kword, vword);
}
catch (IOException e) {
e.printStackTrace();
}
catch (ParseException e) {
e.printStackTrace();
}
}
}
public static class CountryReducer
extends Reducer<Text,Text,Text,Text>
{
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException
{
int ccount = 0;
HashMap<Text, Integer> hm = new HashMap<Text, Integer>();
for (Text val : values)
{
if(hm.containsKey(val)){
Integer n = (Integer)hm.get(val);
hm.put(val, n+1);
}else{
hm.put(val, new Integer(1));
}
}
Set set = hm.entrySet();
Iterator i = set.iterator();
String agr = "";
while(i.hasNext()) {
Map.Entry me = (Map.Entry)i.next();
agr += "|" + me.getKey() + me.getValue();
}
result.set(agr);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "ALoc");
job.setJarByClass(ALoc.class);
job.setMapperClass(AMapper.class);
job.setReducerClass(CountryReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
当我尝试运行作业时。它给出了以下错误。我在 aws 微实例单节点中运行它。我一直在关注本教程http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/
hadoop@domU-18-11-19-02-92-8E:/$ bin/hadoop jar ALoc.jar org.myorg.ALoc /user/hadoop/adata /user/hadoop/adata-op5 -D mapred.reduce.tasks=16
13/02/12 08:39:50 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/02/12 08:39:50 INFO input.FileInputFormat: Total input paths to process : 1
13/02/12 08:39:50 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/02/12 08:39:50 WARN snappy.LoadSnappy: Snappy native library not loaded
13/02/12 08:39:51 INFO mapred.JobClient: Running job: job_201302120714_0006
13/02/12 08:39:52 INFO mapred.JobClient: map 0% reduce 0%
13/02/12 08:40:10 INFO mapred.JobClient: Task Id : attempt_201302120714_0006_m_000000_0, Status : FAILED
java.lang.RuntimeException: Error while running command to get file permissions : java.io.IOException: Cannot run program "/bin/ls": java.io.IOException: error=12, Cannot allocate memory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:475)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:200)
at org.apache.hadoop.util.Shell.run(Shell.java:182)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:375)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:461)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:444)
at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:710)
at org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:443)
at org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getOwner(RawLocalFileSystem.java:426)
at org.apache.hadoop.mapred.TaskLog.obtainLogDirOwner(TaskLog.java:267)
at org.apache.hadoop.mapred.TaskLogsTruncater.truncateLogs(TaskLogsTruncater.java:124)
at org.apache.hadoop.mapred.Child$4.run(Child.java:260)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.io.IOException: java.io.IOException: error=12, Cannot allocate memory
at java.lang.UNIXProcess.<init>(UNIXProcess.java:164)
at java.lang.ProcessImpl.start(ProcessImpl.java:81)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:468)
... 15 more
at org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:468)
at org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getOwner(RawLocalFileSystem.java:426)
at org.apache.hadoop.mapred.TaskLog.obtainLogDirOwner(TaskLog.java:267)
at org.apache.hadoop.mapred.TaskLogsTruncater.truncateLogs(TaskLogsTruncater.java:124)
at org.apache.hadoop.mapred.Child$4.run(Child.java:260)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)