我从以下 Map 阶段编写了 hadoop 程序
public class ParallelIndexation {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, LongWritable> {
private final static LongWritable zero = new LongWritable(0);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
//DOMConfigurator.configure("/folder/log4j.xml");
Configuration conf = new Configuration();
int CountComputers;
FileInputStream fstream = new FileInputStream(
"/export/hadoop-1.0.1/bin/countcomputers.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String result=br.readLine();
CountComputers=Integer.parseInt(result);
input.close();
fstream.close();
Connection con = null;
Statement st = null;
ResultSet rs = null;
String url = "jdbc:postgresql://192.168.1.8:5432/NexentaSearch";
String user = "postgres";
String password = "valter89";
ArrayList<String> paths = new ArrayList<String>();
try
{
con = DriverManager.getConnection(url, user, password);
st = con.createStatement();
rs = st.executeQuery("select path from tasks order by id");
while (rs.next()) { paths.add(rs.getString(1)); };
PrintWriter zzz = null;
try
{
zzz = new PrintWriter(new FileOutputStream("/export/hadoop-1.0.1/bin/readwaysfromdatabase.txt"));
}
catch(FileNotFoundException e)
{
System.out.println("Error");
System.exit(0);
}
for (int i=0; i<paths.size(); i++)
{
zzz.println("paths[i]=" + paths.get(i) + "\n");
}
zzz.close();
}
catch (SQLException e)
{
System.out.println("Connection Failed! Check output console");
e.printStackTrace();
}
String[] ConcatPaths = new String[CountComputers];
int NumberOfElementConcatPaths = 0;
if (paths.size() % CountComputers == 0) {
for (int i = 0; i < CountComputers; i++) {
ConcatPaths[i] = paths.get(NumberOfElementConcatPaths);
NumberOfElementConcatPaths += paths.size() / CountComputers;
for (int j = 1; j < paths.size() / CountComputers; j++) {
ConcatPaths[i] += "\n"
+ paths.get(i * paths.size() / CountComputers
+ j);
}
}
} else {
NumberOfElementConcatPaths = 0;
for (int i = 0; i < paths.size() % CountComputers; i++) {
ConcatPaths[i] = paths.get(NumberOfElementConcatPaths);
NumberOfElementConcatPaths += paths.size() / CountComputers
+ 1;
for (int j = 1; j < paths.size() / CountComputers + 1; j++) {
ConcatPaths[i] += "\n"
+ paths.get(i
* (paths.size() / CountComputers + 1)
+ j);
}
}
for (int k = paths.size() % CountComputers; k < CountComputers; k++) {
ConcatPaths[k] = paths.get(NumberOfElementConcatPaths);
NumberOfElementConcatPaths += paths.size() / CountComputers;
for (int j = 1; j < paths.size() / CountComputers; j++) {
ConcatPaths[k] += "\n"
+ paths.get((k - paths.size() % CountComputers)
* paths.size() / CountComputers
+ paths.size() % CountComputers
* (paths.size() / CountComputers + 1)
+ j);
}
}
}
for (int i = 0; i < ConcatPaths.length; i++) {
word.set(ConcatPaths[i]);
output.collect(word, zero);
}
}
}
作为执行的结果,我收到了以下消息
args[0]=/export/hadoop-1.0.1/bin/input
13/04/27 16:02:29 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/04/27 16:02:29 INFO mapred.FileInputFormat: Total input paths to process : 1
13/04/27 16:02:29 INFO mapred.JobClient: Running job: job_201304271556_0001
13/04/27 16:02:30 INFO mapred.JobClient: map 0% reduce 0%
13/04/27 16:06:45 INFO mapred.JobClient: map 50% reduce 0%
13/04/27 16:06:50 INFO mapred.JobClient: Task Id : attempt_201304271556_0001_m_000001_0, Status : FAILED
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:604)
at java.util.ArrayList.get(ArrayList.java:382)
at org.myorg.ParallelIndexation$Map.map(ParallelIndexation.java:100)
at org.myorg.ParallelIndexation$Map.map(ParallelIndexation.java:29)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
13/04/27 16:07:50 WARN mapred.JobClient: Error reading task outputmyhost2
13/04/27 16:07:50 WARN mapred.JobClient: Error reading task outputmyhost2
13/04/27 16:07:50 INFO mapred.JobClient: Job complete: job_201304271556_0001
13/04/27 16:07:50 INFO mapred.JobClient: Counters: 21
13/04/27 16:07:50 INFO mapred.JobClient: Job Counters
13/04/27 16:07:50 INFO mapred.JobClient: Launched reduce tasks=1
13/04/27 16:07:50 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=37867
13/04/27 16:07:50 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/04/27 16:07:50 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/04/27 16:07:50 INFO mapred.JobClient: Launched map tasks=5
13/04/27 16:07:50 INFO mapred.JobClient: Data-local map tasks=5
13/04/27 16:07:50 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=24154
13/04/27 16:07:50 INFO mapred.JobClient: Failed map tasks=1
13/04/27 16:07:50 INFO mapred.JobClient: File Input Format Counters
13/04/27 16:07:50 INFO mapred.JobClient: Bytes Read=99
13/04/27 16:07:50 INFO mapred.JobClient: FileSystemCounters
13/04/27 16:07:50 INFO mapred.JobClient: HDFS_BYTES_READ=215
13/04/27 16:07:50 INFO mapred.JobClient: FILE_BYTES_WRITTEN=21573
13/04/27 16:07:50 INFO mapred.JobClient: Map-Reduce Framework
13/04/27 16:07:50 INFO mapred.JobClient: Map output materialized bytes=6
13/04/27 16:07:50 INFO mapred.JobClient: Combine output records=0
13/04/27 16:07:50 INFO mapred.JobClient: Map input records=0
13/04/27 16:07:50 INFO mapred.JobClient: Spilled Records=0
13/04/27 16:07:50 INFO mapred.JobClient: Map output bytes=0
13/04/27 16:07:50 INFO mapred.JobClient: Total committed heap usage (bytes)=160763904
13/04/27 16:07:50 INFO mapred.JobClient: Map input bytes=0
13/04/27 16:07:50 INFO mapred.JobClient: Combine input records=0
13/04/27 16:07:50 INFO mapred.JobClient: Map output records=0
13/04/27 16:07:50 INFO mapred.JobClient: SPLIT_RAW_BYTES=116
13/04/27 16:07:50 INFO mapred.JobClient: Job Failed: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201304271556_0001_m_000001
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1265)
at org.myorg.ParallelIndexation.main(ParallelIndexation.java:183)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
我还将给 Map 一个已成功执行的程序阶段(至少从它工作的数据库来看,我至少写了一个执行日志不是绝对理想的,但尽管有必要的记录已输入到数据库中,可能会有所帮助)输入文本文件,其中包含来自数据库的文件的方式到达,写入一行并按字符分区|
(在不可操作的程序中,显然从文本中这些方式是直接从数据库中读取的)
public class ParallelIndexation {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, LongWritable> {
private final static LongWritable zero = new LongWritable(0);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
int CountComputers;
FileInputStream fstream = new FileInputStream(
"/usr/countcomputers.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String result=br.readLine();
CountComputers=Integer.parseInt(result);
in.close();
fstream.close();
ArrayList<String> paths = new ArrayList<String>();
StringTokenizer tokenizer = new StringTokenizer(line, "|");
while (tokenizer.hasMoreTokens()) {
paths.add(tokenizer.nextToken());
}
String[] ConcatPaths = new String[CountComputers];
int NumberOfElementConcatPaths = 0;
if (paths.size() % CountComputers == 0) {
for (int i = 0; i < CountComputers; i++) {
ConcatPaths[i] = paths.get(NumberOfElementConcatPaths);
NumberOfElementConcatPaths += paths.size() / CountComputers;
for (int j = 1; j < paths.size() / CountComputers; j++) {
ConcatPaths[i] += "\n"
+ paths.get(i * paths.size() / CountComputers
+ j);
}
}
} else {
NumberOfElementConcatPaths = 0;
for (int i = 0; i < paths.size() % CountComputers; i++) {
ConcatPaths[i] = paths.get(NumberOfElementConcatPaths);
NumberOfElementConcatPaths += paths.size() / CountComputers
+ 1;
for (int j = 1; j < paths.size() / CountComputers + 1; j++) {
ConcatPaths[i] += "\n"
+ paths.get(i
* (paths.size() / CountComputers + 1)
+ j);
}
}
for (int k = paths.size() % CountComputers; k < CountComputers; k++) {
ConcatPaths[k] = paths.get(NumberOfElementConcatPaths);
NumberOfElementConcatPaths += paths.size() / CountComputers;
for (int j = 1; j < paths.size() / CountComputers; j++) {
ConcatPaths[k] += "\n"
+ paths.get((k - paths.size() % CountComputers)
* paths.size() / CountComputers
+ paths.size() % CountComputers
* (paths.size() / CountComputers + 1)
+ j);
}
}
}
for (int i = 0; i < ConcatPaths.length; i++) {
word.set(ConcatPaths[i]);
output.collect(word, zero);
}
}
}
帮助消除错误。错误在一行
ConcatPaths[i] = paths.get(NumberOfElementConcatPaths);
因为可能paths.size
等于零。但是为什么arraylist
paths
从数据库中读取的结果(请求中指定的表完成了 6 条记录)没有被填充?