0

我有一组为 hadoop 编写的连续作业。只有第一种方法运行,并以成功运行结束。第二种方法(工作)根本没有开始。这是我的代码:

只是 distanceCalculator 方法运行,它不会继续进行。

import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.omg.CosNaming.IstringHelper;

import weka.classifiers.functions.LibSVM;
import weka.core.Attribute;
import weka.core.FastVector;
import weka.core.Instance;
import weka.core.Instances;
import weka.filters.unsupervised.attribute.Add;

import com.wipro.decision.SVMDecision;
import com.wipro.instance.InstanceData;


//include path of test file

public class UniqueEntityClustering {

static Configuration conf = null;
static  Instances inpInst  = null;
static FastVector fvWekaAttributes = null;
static String pathOfTestFile = "/user/root/TestData.txt";
static String pathOfRefFile = "";
static String pathOfClustFile = "";
static String job3RefFile = "";
static int iterations = 0;

/* Mapper
 * The mapper that will read the input file and generate the distance file
 */

public static class ClustererMapper 
extends Mapper<Object, Text, Text, Text>{

    private FileSystem fs;
    private ArrayList<String> curClustArr = new ArrayList<String>();
    //static int iterations;

//      static String pathOfRefFile = "";

    public void setup(Context context) 
            throws IOException, InterruptedException {

        FileSystem fs = FileSystem.get(new Configuration());
        String fileName = "";
        if(iterations == 0)
            fileName = "/user/root/AllClusterDistances/part-r-00000";
        else
            fileName =     
"/user/root/AllClusterDistances"+iterations+"/part-r-00000";
        Path svmpath = new Path(fileName);
        InputStream ins = fs.open(svmpath);
        BufferedReader reader = new BufferedReader(new 
InputStreamReader(ins));
        String line=null;
        String[] clustPairIDs = null;
        String[] temp = null;

        if((line=reader.readLine())!=null)
            clustPairIDs = line.split("=");

        temp = clustPairIDs[0].split("/");
        curClustArr.add(0, temp[0]);
        curClustArr.add(1, temp[1]);
    }

    public void map(Object key, Text value, Context context) throws 
IOException, InterruptedException {

        String text = value.toString();
        String[] values = null;
        int flag = 0;

        if(iterations < 1)
            values = text.split("\\t");
        else
            values = text.split("=");

        if(!curClustArr.contains(values[0]))
            context.write(new Text(values[0]),new Text("0"));
        else
        {
            if(flag==0)
            {
                flag=1;
                String wKey = curClustArr.get(0) + "+" + 
curClustArr.get(1);
                context.write(new Text(wKey), value);
            }
        }
    }

}

public static class ClustererReducer 
extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, 
            Context context
            ) throws IOException, InterruptedException {

        for (Text val : values) {
            context.write(key, val);
        }
    }
}

public static class DistMapper 
extends Mapper<Object, Text, DoubleWritable, Text>{

    private HashMap<String,Double> RefMap = new HashMap<String,Double>();

    public void setup(Context context) throws IOException, InterruptedException   
{



        String line = null;
        String[] temp = null;

        FileSystem fs = FileSystem.get(new Configuration());
        Path svmpath = new Path(job3RefFile);
        InputStream ins = fs.open(svmpath);
        BufferedReader reader = new BufferedReader(new   

InputStreamReader(ins));

        while((line=reader.readLine())!=null)
        {
            temp = line.split("=");
            RefMap.put(temp[0], Double.valueOf(temp[1]));
        }

    }

    public void map(Object key, Text value, Context context) throws 
IOException, InterruptedException {

        String text = value.toString();
        String[] values = text.split("=");
        String curMapEle = values[0];
        String[] curMapEleArr = curMapEle.split("+");

        String line = null;
        String curFileEle = null;
        String[] curFileEleArr = null;
        String temp = null;
        Double min = null;
        Double dist = null;
        FileSystem fs = FileSystem.get(new Configuration());
        Path svmpath = new Path(pathOfClustFile);
        InputStream ins = fs.open(svmpath);
        BufferedReader reader = new BufferedReader(new 
InputStreamReader(ins));

        while((line=reader.readLine())!=null)
        {
            curFileEle = line.split("=")[0];
            curFileEleArr = curFileEle.split("+");
            if(!curFileEle.equals(curMapEle))
            {
                for(int i=0; i<curMapEleArr.length; i++)
                    for(int j=0; j<curFileEleArr.length; j++)
                    {
                        temp = 
curMapEleArr[i]+"/"+curFileEleArr[j];
                        dist = RefMap.get(temp);
                        if(i==0 && j==0)
                            min=dist;
                        else
                        {
                            if(dist<min)
                                min=dist;
                        }
                    }
                temp = curMapEle + "/" + curFileEle;
                context.write(new DoubleWritable(min), new 
Text(temp));
            }    
        }
    }

}

public static class DistReducer 
extends Reducer<DoubleWritable, Text, Text, DoubleWritable> {

    public void reduce(Text key, Iterable<Text> values, Context context) throws 
IOException, InterruptedException {

        for (Text val : values) {
            context.write(val, new 
DoubleWritable(Double.valueOf(key.toString())));

        }
    }

}

public static class DistanceMapper 
extends Mapper<Object, Text, DoubleWritable, Text>{

    private FileSystem fs;
    static LibSVM svm = null;

    public void setup(Context context) 
    {
        try {
            fs = FileSystem.get(context.getConfiguration());
        } catch (IOException e) {
            e.printStackTrace();
        }

        Attribute Attribute1 = new Attribute("nameDist");
        Attribute Attribute2 = new Attribute("locDist");
        Attribute Attribute3 = new Attribute("workDist");

        FastVector fvNominalVal = new FastVector(2);
        fvNominalVal.addElement("0");
        fvNominalVal.addElement("1");
        Attribute ClassAttribute = new Attribute("class", fvNominalVal);

        fvWekaAttributes = new FastVector(4);
        fvWekaAttributes.addElement(Attribute1);    
        fvWekaAttributes.addElement(Attribute2);  
        fvWekaAttributes.addElement(Attribute3); 
        fvWekaAttributes.addElement(ClassAttribute);

        inpInst = new Instances("Rel", fvWekaAttributes, 1);           
        inpInst.setClassIndex(3);

        FileSystem fs;
        ObjectInputStream ob = null;
        try {
            fs = FileSystem.get(new Configuration());

            Path svmpath = new Path("/user/root/j48.model");
            InputStream ins = fs.open(svmpath);
            ob = new ObjectInputStream(ins);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        //          try {
        try {
            svm = (LibSVM)ob.readObject();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        /*      } catch (ClassNotFoundException e) {
        e.printStackTrace();
    }*/


    }

    public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {

        String text = value.toString();
        String[] values = text.split("\t");
        int mclusterID = Integer.valueOf(values[0]);
        String mfirstName = values[1];
        String mmiddleName = values[2];
        String mlastName = values[3];
        String mlocation = values[4];
        String[] mworkArray = new String[values.length - 5];
        for(int i=5; i<values.length; i++) {
            mworkArray[i-5] = values[i];
        }

        InstanceData mapInstance = new InstanceData(mfirstName, 
mmiddleName, mlastName, mlocation, mworkArray);

        FileSystem fs = FileSystem.get(new Configuration());
        Path path = new Path(pathOfTestFile); //fstatus.getPath();
        FSDataInputStream fis = fs.open(path);
        BufferedReader reader = new BufferedReader(new 
InputStreamReader(fis));
        String line = null;
        while ((line = reader.readLine()) != null) {

            String[] ele = line.split("\t");

            int fclusterID = Integer.valueOf(ele[0]);

            if(fclusterID == mclusterID)
                continue;

            String ffirstName = ele[1];
            String fmiddleName = ele[2];
            String flastName = ele[3];
            String flocation = ele[4];
            String[] fworkArray = new String[ele.length - 5];
            for(int i=5; i<ele.length; i++) {
                fworkArray[i-5] = ele[i];
            }

            InstanceData fileInstance = new InstanceData(ffirstName, 
fmiddleName, flastName, flocation, fworkArray);

            double nameDist = 
NameDistance.getDistanceBetweenNames(mapInstance, fileInstance);
            double locationDist = 
LocationDistance.getDistanceBetweenLocations(mapInstance, fileInstance);
            double workDistance =        
WorkDistance.getDistanceBetweenWorkArr(mapInstance, fileInstance);

            try {
                fs = FileSystem.get(context.getConfiguration());
            } catch (IOException e) {
                e.printStackTrace();
            }

            Attribute Attribute1 = new Attribute("nameDist");
            Attribute Attribute2 = new Attribute("locDist");
            Attribute Attribute3 = new Attribute("workDist");

            FastVector fvNominalVal = new FastVector(2);
            fvNominalVal.addElement("0");
            fvNominalVal.addElement("1");
            Attribute ClassAttribute = new Attribute("class", fvNominalVal);

            fvWekaAttributes = new FastVector(4);
            fvWekaAttributes.addElement(Attribute1);    
            fvWekaAttributes.addElement(Attribute2);  
            fvWekaAttributes.addElement(Attribute3); 
            fvWekaAttributes.addElement(ClassAttribute);

            inpInst = new Instances("Rel", fvWekaAttributes, 1);           

            inpInst.setClassIndex(3);

            Instance inst = new Instance(4);
            inst.setValue((Attribute)fvWekaAttributes.elementAt(0), 
nameDist);      
            inst.setValue((Attribute)fvWekaAttributes.elementAt(1), 
locationDist); 
            inst.setValue((Attribute)fvWekaAttributes.elementAt(2), 
workDistance); 
            inst.setValue((Attribute)fvWekaAttributes.elementAt(3), 
"0");

            inpInst.add(inst);

            //try {
            try {
                svm.classifyInstance(inpInst.instance(0));
            } catch (Exception e) {
                e.printStackTrace();
            }
            double prob = SVMDecision.decision_value;
            DoubleWritable outputKey = new DoubleWritable();
            outputKey.set(prob);
            Text outputValue = new 
Text(String.valueOf(mclusterID)+"/"+String.valueOf(fclusterID));

            context.write(outputKey, outputValue);
            /*                  } catch (Exception 
e) {
                e.printStackTrace();
            }*/
        }
        //}
    }
}

public static class DistanceReducer 
extends Reducer<DoubleWritable, Text, Text, DoubleWritable> {

    public void reduce(DoubleWritable key, Iterable<Text> values, 
            Context context
            ) throws IOException, InterruptedException {

        for (Text val : values) {
            context.write(val, key);
        }
    }
}

public static void distanceCalculator(Path inputDir, Path outputDir) throws 
Exception{

    conf = new Configuration();
    Job job = new Job(conf, "distanceCalculator");
    job.setJarByClass(UniqueEntityClustering.class);
    job.setInputFormatClass(TextInputFormat.class);

    job.setMapperClass(DistanceMapper.class);
    job.setReducerClass(DistanceReducer.class);

    job.setMapOutputKeyClass(DoubleWritable.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);

    FileInputFormat.addInputPath(job, inputDir);
    FileOutputFormat.setOutputPath(job, outputDir);

    System.exit(job.waitForCompletion(true) ? 0: 1);        

}

public static void findCluster(Path inputPath, Path outputPath, int ite) throws 
IOException, ClassNotFoundException, InterruptedException
{
    conf = new Configuration();
    Job job = new Job(conf, "Clusterer"+ite);
    job.setJarByClass(UniqueEntityClustering.class);
    job.setInputFormatClass(TextInputFormat.class);
    //ClustererMapper.iterations = ite;
    //ClustererMapper.pathOfRefFile = fileName;
    job.setMapperClass(ClustererMapper.class);
    job.setReducerClass(ClustererReducer.class);

    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);

    System.exit(job.waitForCompletion(true) ? 0: 1);
}

public static void distjob(Path inputPath, Path outputPath, int ite) throws 
IOException, ClassNotFoundException, InterruptedException
{
    conf = new Configuration();
    Job job = new Job(conf, "Dist"+ite);
    job.setJarByClass(UniqueEntityClustering.class);
    job.setInputFormatClass(TextInputFormat.class);

    job.setMapperClass(DistMapper.class);
    job.setReducerClass(DistReducer.class);

    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);
    System.exit(job.waitForCompletion(true) ? 0: 1);
}

public static void main(String[] args) throws Exception {

    conf = new Configuration();
    //String[] otherArgs = new GenericOptionsParser(conf, 
args).getRemainingArgs();
    if (args.length != 2) {
        System.err.println("Usage: wordcount <in> <out>");
        System.exit(2);
    }

    boolean isClusteringTerminated = false;

    String job2OutDir = null;
    String job2InpDir = null;
    String job3InpDir = null;
    String job3OutDir = null;
    String job1OutDir = null;
    String job1InpDir = null;
    String job3OutDirBase = null;

    String job2OutDirBase = "/user/root/clusterer";
    job3OutDirBase = "/user/root/AllClusterDistances";

    job1InpDir = "/user/root/TestData";
    job1OutDir = "/user/root/AllClusterDistances";

    //int iterations = 0;
    //String pathOfRefFile = "";

    //job2InpDir = "/user/root/OutputDist"+String.valueOf(iterations);

    //Start the jobs

    distanceCalculator(new Path(job1InpDir), new Path(job1OutDir));

    //while(isClusteringTerminated == false) {
        //parameters for job 2


        if(iterations == 0) {
            pathOfRefFile = job1OutDir + "/" + "part-r-00000";
            job2InpDir = job1InpDir;
        }
        else {
            pathOfRefFile = job3OutDir+ "/" + "part-r-00000";
            job2InpDir = job2OutDirBase + String.valueOf(iterations - 
1);
        }

        job2OutDir = job2OutDirBase + String.valueOf(iterations);
        findCluster(new Path(job2InpDir), new Path(job2OutDir), 
iterations); //, pathOfRefFile);

        //parameters for job 3
        job3RefFile = job1OutDir + "/" + "part-r-00000";
        job3InpDir = job2OutDir;
        job3OutDir = job3OutDirBase + String.valueOf(iterations);

        distjob(new Path(job3InpDir), new Path(job3OutDir), iterations);

        iterations++;

        isClusteringTerminated = determineTermination(new 
Path(job3OutDir));
    //}

}

public static boolean determineTermination(Path dirPath) {

    FileSystem fs = null;
    try {
        fs = FileSystem.get(new Configuration());
    } catch (IOException e) {
        e.printStackTrace();
    }

    FileStatus[] fstatuses = null;
    try {
        fstatuses = fs.listStatus(dirPath);
    } catch (IOException e) {
        e.printStackTrace();
    }
    for (FileStatus fstatus : fstatuses) {
        Path path = fstatus.getPath();
        if (! path.getName().startsWith("part-r")) {
            continue;
        }
        FSDataInputStream fis = null;
        try {
            fis = fs.open(path);
        } catch (IOException e) {
            e.printStackTrace();
        }
        BufferedReader reader = new BufferedReader(new 
InputStreamReader(fis));
        String line = null;
        try {
            line = reader.readLine();
        } catch (IOException e) {
            e.printStackTrace();
        }
        String[] values = line.split("=");
        double val = Double.valueOf(values[1]);
        if(val < 0.0)
            return true;    

    }
    return false;
}

}
4

0 回答 0