0

hadoop我从以下Map阶段编写程序

package org.myorg;

import java.io.*;
import java.util.*;
import java.sql.*;
import java.util.logging.Level;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ParallelTraining {

    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {
        private final static LongWritable zero = new LongWritable(0);
        private Text word = new Text();
        private Text outputvalue = new Text();

        public void map(LongWritable key, Text value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
            Configuration conf = new Configuration();
            int CountComputers; 

            FileInputStream fstream = new FileInputStream(
                    "/export/hadoop-1.0.1/bin/countcomputers.txt");

            BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
            String result=br.readLine();
            CountComputers=Integer.parseInt(result);
            input.close();
            fstream.close();

            Connection con = null;
            Statement st = null;
            ResultSet rs = null;    
            String url = "jdbc:postgresql://192.168.1.8:5432/NexentaSearch";
            String user = "postgres";
            String password = "valter89";


            ArrayList<String> files = new ArrayList<String>();
            ArrayList<String> therms = new ArrayList<String>();
            ArrayList<Integer> idoffiles = new ArrayList<Integer>();
            ArrayList<Integer> countthermsinfiles = new ArrayList<Integer>();
            String currentFile; 
            int CountFiles=0; 
            int CountThermsInFile; 
            int CountFilesOnClusterNode; 
            int RemainFilesOnOneClusterNode; 
            try
            {
                con = DriverManager.getConnection(url, user, password);
                        st = con.createStatement();
                        rs = st.executeQuery("SELECT a.value, c.value, c.id FROM therms a INNER JOIN therms_occurs b ON b.therm_id = a.id INNER JOIN fs_entries c ON b.file_id = c.id ORDER BY a.value, c.value, c.id");
                rs.first();
                therms.add(rs.getString(1));
                CountThermsInFile=1; 
                currentFile=rs.getString(2); 
                idoffiles.add(rs.getInt(3)); 
                CountFiles=1; 
                while (rs.next()) 
                { 
                    if (currentFile!=rs.getString(2)) 
                        {
                            countthermsinfiles.add(CountThermsInFile); 
                            CountFiles+=1; 
                            currentFile=rs.getString(2); 
                            CountThermsInFile=1; 
                            idoffiles.add(rs.getInt(3)); 
                        }
                    else
                        {
                            CountThermsInFile+=1; 
                        }; 
                    therms.add(rs.getString(1));
                };
                countthermsinfiles.add(CountThermsInFile); 
            }
            catch (SQLException e) 
            {
                System.out.println("Connection Failed! Check output console");
                e.printStackTrace();
            }

            if (CountComputers!=2)
            {
                if (CountFiles%CountComputers==0)
                {
                    CountFilesOnClusterNode=CountFiles/CountComputers;
                    RemainFilesOnOneClusterNode=0;
                }
                else
                {
                    CountFilesOnClusterNode=CountFiles/(CountComputers-1);
                    RemainFilesOnOneClusterNode=CountFiles%(CountComputers-1);
                };
            }
            else
            {
                if (CountFiles%2==0)
                {
                    CountFilesOnClusterNode=CountFiles/2;
                    RemainFilesOnOneClusterNode=0;
                }
                else
                {
                    CountFilesOnClusterNode=CountFiles/2+1;
                    RemainFilesOnOneClusterNode=CountFiles-CountFilesOnClusterNode;
                };
            };



            String[] ConcatPaths = new String[CountComputers];
            int NumberTherms = 0;
            int currentCountOfTherms=0;
            if (RemainFilesOnOneClusterNode==0)
            {
                for (int i=0; i<CountComputers; i++)
                {
                    for (int j=0; j<CountFilesOnClusterNode; j++)
                    {
                        for (int k=0; k<countthermsinfiles.get(i*CountFilesOnClusterNode+j); k++)
                            {
                                if (!(k==(countthermsinfiles.get(i*CountFilesOnClusterNode+CountFilesOnClusterNode-1)-1)))
                                {
                                    ConcatPaths[i] +=therms.get(currentCountOfTherms+k)+"\n";
                                }
                                else
                                {
                                    ConcatPaths[i] +=therms.get(currentCountOfTherms+k);
                                };
                            }
                        currentCountOfTherms+=countthermsinfiles.get(i*CountFilesOnClusterNode+j);  
                    }
                }
            }
            else
            {
                for (int i=0; i<CountComputers-1; i++)
                {           
                    for (int j=0; j<CountFilesOnClusterNode; j++)
                    {
                        for (int k=0; k<countthermsinfiles.get(i*CountFilesOnClusterNode+j); k++)
                            {
                                if (!(k==(countthermsinfiles.get(i*CountFilesOnClusterNode+CountFilesOnClusterNode-1)-1)))
                                {
                                    ConcatPaths[i] +=therms.get(currentCountOfTherms+k)+"\n";
                                }
                                else
                                {
                                    ConcatPaths[i] +=therms.get(currentCountOfTherms+k);
                                };
                            }
                        currentCountOfTherms+=countthermsinfiles.get(i*CountFilesOnClusterNode+j);  
                    }           
                };
                for (int j=0; j<RemainFilesOnOneClusterNode; j++)
                    {
                        for (int k=0; k<countthermsinfiles.get((CountComputers-1)*CountFilesOnClusterNode+j); k++)
                            {
                                    if (!(k==(countthermsinfiles.get((CountComputers-1)*CountFilesOnClusterNode+(RemainFilesOnOneClusterNode-1))-1)))
                                    {
                                        ConcatPaths[CountComputers-1] +=therms.get(currentCountOfTherms+k)+"\n";
                                    }
                                    else
                                    {
                                        ConcatPaths[CountComputers-1] +=therms.get(currentCountOfTherms+k);
                                    }
                            }
                        currentCountOfTherms+=countthermsinfiles.get((CountComputers-1)*CountFilesOnClusterNode+j); 
                    };              
            };


            String[] ConcatCountOfThermsAndIds= new String[CountComputers];
            if (RemainFilesOnOneClusterNode==0)
            {
                for (int i=0; i<CountComputers; i++)
                {
                    for (int j=0; j<CountFilesOnClusterNode; j++)
                    {
                        ConcatCountOfThermsAndIds[i]+=Integer.toString(countthermsinfiles.get(i*CountFilesOnClusterNode+j))+" "+Integer.toString(idoffiles.get(i*CountFilesOnClusterNode+j))+" ";
                    };
                };
            }
            else
            {
                for (int i=0; i<CountComputers-1; i++)
                {
                    for (int j=0; j<CountFilesOnClusterNode; j++)
                    {
                        ConcatCountOfThermsAndIds[i]+=Integer.toString(countthermsinfiles.get(i*CountFilesOnClusterNode+j))+" "+Integer.toString(idoffiles.get(i*CountFilesOnClusterNode+j))+" ";
                    };
                };
                for (int j=0; j<RemainFilesOnOneClusterNode; j++)
                {
                    ConcatCountOfThermsAndIds[CountComputers-1]+=Integer.toString(countthermsinfiles.get((CountComputers-1)*CountFilesOnClusterNode+j))+" "+Integer.toString(idoffiles.get((CountComputers-1)*CountFilesOnClusterNode+j))+" ";
                };
            };

            for (int i = 0; i < ConcatPaths.length; i++) {
                word.set(ConcatPaths[i]);
                outputvalue.set(ConcatCountOfThermsAndIds[i]);
                output.collect(word, outputvalue);
            }
        }
    }

由于在集群上执行,我收到以下消息

args[0]=/export/hadoop-1.0.1/bin/input
13/05/01 14:53:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/05/01 14:53:45 INFO mapred.FileInputFormat: Total input paths to process : 1
13/05/01 14:53:46 INFO mapred.JobClient: Running job: job_201304302334_0002
13/05/01 14:53:47 INFO mapred.JobClient:  map 0% reduce 0%
13/05/01 14:58:00 INFO mapred.JobClient:  map 50% reduce 0%
13/05/01 14:58:01 INFO mapred.JobClient: Task Id : attempt_201304302334_0002_m_000000_0, Status : FAILED
java.lang.NullPointerException
    at org.apache.hadoop.io.Text.encode(Text.java:388)
    at org.apache.hadoop.io.Text.set(Text.java:178)
    at org.myorg.ParallelTraining$Map.map(ParallelTraining.java:206)
    at org.myorg.ParallelTraining$Map.map(ParallelTraining.java:15)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

13/05/01 14:59:01 WARN mapred.JobClient: Error reading task outputmyhost3
13/05/01 14:59:01 WARN mapred.JobClient: Error reading task outputmyhost3
13/05/01 14:59:01 INFO mapred.JobClient: Job complete: job_201304302334_0002
13/05/01 14:59:01 INFO mapred.JobClient: Counters: 21
13/05/01 14:59:01 INFO mapred.JobClient:   Job Counters 
13/05/01 14:59:01 INFO mapred.JobClient:     Launched reduce tasks=1
13/05/01 14:59:01 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=39304
13/05/01 14:59:01 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/05/01 14:59:01 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/05/01 14:59:01 INFO mapred.JobClient:     Launched map tasks=5
13/05/01 14:59:01 INFO mapred.JobClient:     Data-local map tasks=5
13/05/01 14:59:01 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=24216
13/05/01 14:59:01 INFO mapred.JobClient:     Failed map tasks=1
13/05/01 14:59:01 INFO mapred.JobClient:   File Input Format Counters 
13/05/01 14:59:01 INFO mapred.JobClient:     Bytes Read=99
13/05/01 14:59:01 INFO mapred.JobClient:   FileSystemCounters
13/05/01 14:59:01 INFO mapred.JobClient:     HDFS_BYTES_READ=215
13/05/01 14:59:01 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=21557
13/05/01 14:59:01 INFO mapred.JobClient:   Map-Reduce Framework
13/05/01 14:59:01 INFO mapred.JobClient:     Map output materialized bytes=6
13/05/01 14:59:01 INFO mapred.JobClient:     Combine output records=0
13/05/01 14:59:01 INFO mapred.JobClient:     Map input records=0
13/05/01 14:59:01 INFO mapred.JobClient:     Spilled Records=0
13/05/01 14:59:01 INFO mapred.JobClient:     Map output bytes=0
13/05/01 14:59:01 INFO mapred.JobClient:     Total committed heap usage (bytes)=160763904
13/05/01 14:59:01 INFO mapred.JobClient:     Map input bytes=0
13/05/01 14:59:01 INFO mapred.JobClient:     Combine input records=0
13/05/01 14:59:01 INFO mapred.JobClient:     Map output records=0
13/05/01 14:59:01 INFO mapred.JobClient:     SPLIT_RAW_BYTES=116
13/05/01 14:59:01 INFO mapred.JobClient: Job Failed: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201304302334_0002_m_000000
Exception in thread "main" java.io.IOException: Job failed!
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1265)
    at org.myorg.ParallelTraining.main(ParallelTraining.java:353)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

错误包含在一行中

word.set(ConcatPaths[i]);

错误的原因是什么?它产生了什么(由于它产生了什么)?帮助删除它。

4

1 回答 1

1

我怀疑ConcatPaths[i]是空的,并且word.set需要一个非空值。

请注意,数组在任何给定索引处都包含空值是一种非常可能的情况。要修复,您可以在设置之前进行空检查。

if (ConcatPaths[i]) word.set(ConcatPaths[i]);

或者,您可以确保一个值在放入之前不为空ConcatPaths,或两者兼而有之,如果数组中的任何值为空,则可能会记录错误,而您并不期望这样。

于 2013-05-01T12:02:06.507 回答