我有一组为 hadoop 编写的连续作业。只有第一种方法运行,并以成功运行结束。第二种方法(工作)根本没有开始。这是我的代码:
只是 distanceCalculator 方法运行,它不会继续进行。
import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.omg.CosNaming.IstringHelper;
import weka.classifiers.functions.LibSVM;
import weka.core.Attribute;
import weka.core.FastVector;
import weka.core.Instance;
import weka.core.Instances;
import weka.filters.unsupervised.attribute.Add;
import com.wipro.decision.SVMDecision;
import com.wipro.instance.InstanceData;
//include path of test file
public class UniqueEntityClustering {
static Configuration conf = null;
static Instances inpInst = null;
static FastVector fvWekaAttributes = null;
static String pathOfTestFile = "/user/root/TestData.txt";
static String pathOfRefFile = "";
static String pathOfClustFile = "";
static String job3RefFile = "";
static int iterations = 0;
/* Mapper
* The mapper that will read the input file and generate the distance file
*/
public static class ClustererMapper
extends Mapper<Object, Text, Text, Text>{
private FileSystem fs;
private ArrayList<String> curClustArr = new ArrayList<String>();
//static int iterations;
// static String pathOfRefFile = "";
public void setup(Context context)
throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(new Configuration());
String fileName = "";
if(iterations == 0)
fileName = "/user/root/AllClusterDistances/part-r-00000";
else
fileName =
"/user/root/AllClusterDistances"+iterations+"/part-r-00000";
Path svmpath = new Path(fileName);
InputStream ins = fs.open(svmpath);
BufferedReader reader = new BufferedReader(new
InputStreamReader(ins));
String line=null;
String[] clustPairIDs = null;
String[] temp = null;
if((line=reader.readLine())!=null)
clustPairIDs = line.split("=");
temp = clustPairIDs[0].split("/");
curClustArr.add(0, temp[0]);
curClustArr.add(1, temp[1]);
}
public void map(Object key, Text value, Context context) throws
IOException, InterruptedException {
String text = value.toString();
String[] values = null;
int flag = 0;
if(iterations < 1)
values = text.split("\\t");
else
values = text.split("=");
if(!curClustArr.contains(values[0]))
context.write(new Text(values[0]),new Text("0"));
else
{
if(flag==0)
{
flag=1;
String wKey = curClustArr.get(0) + "+" +
curClustArr.get(1);
context.write(new Text(wKey), value);
}
}
}
}
public static class ClustererReducer
extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
for (Text val : values) {
context.write(key, val);
}
}
}
public static class DistMapper
extends Mapper<Object, Text, DoubleWritable, Text>{
private HashMap<String,Double> RefMap = new HashMap<String,Double>();
public void setup(Context context) throws IOException, InterruptedException
{
String line = null;
String[] temp = null;
FileSystem fs = FileSystem.get(new Configuration());
Path svmpath = new Path(job3RefFile);
InputStream ins = fs.open(svmpath);
BufferedReader reader = new BufferedReader(new
InputStreamReader(ins));
while((line=reader.readLine())!=null)
{
temp = line.split("=");
RefMap.put(temp[0], Double.valueOf(temp[1]));
}
}
public void map(Object key, Text value, Context context) throws
IOException, InterruptedException {
String text = value.toString();
String[] values = text.split("=");
String curMapEle = values[0];
String[] curMapEleArr = curMapEle.split("+");
String line = null;
String curFileEle = null;
String[] curFileEleArr = null;
String temp = null;
Double min = null;
Double dist = null;
FileSystem fs = FileSystem.get(new Configuration());
Path svmpath = new Path(pathOfClustFile);
InputStream ins = fs.open(svmpath);
BufferedReader reader = new BufferedReader(new
InputStreamReader(ins));
while((line=reader.readLine())!=null)
{
curFileEle = line.split("=")[0];
curFileEleArr = curFileEle.split("+");
if(!curFileEle.equals(curMapEle))
{
for(int i=0; i<curMapEleArr.length; i++)
for(int j=0; j<curFileEleArr.length; j++)
{
temp =
curMapEleArr[i]+"/"+curFileEleArr[j];
dist = RefMap.get(temp);
if(i==0 && j==0)
min=dist;
else
{
if(dist<min)
min=dist;
}
}
temp = curMapEle + "/" + curFileEle;
context.write(new DoubleWritable(min), new
Text(temp));
}
}
}
}
public static class DistReducer
extends Reducer<DoubleWritable, Text, Text, DoubleWritable> {
public void reduce(Text key, Iterable<Text> values, Context context) throws
IOException, InterruptedException {
for (Text val : values) {
context.write(val, new
DoubleWritable(Double.valueOf(key.toString())));
}
}
}
public static class DistanceMapper
extends Mapper<Object, Text, DoubleWritable, Text>{
private FileSystem fs;
static LibSVM svm = null;
public void setup(Context context)
{
try {
fs = FileSystem.get(context.getConfiguration());
} catch (IOException e) {
e.printStackTrace();
}
Attribute Attribute1 = new Attribute("nameDist");
Attribute Attribute2 = new Attribute("locDist");
Attribute Attribute3 = new Attribute("workDist");
FastVector fvNominalVal = new FastVector(2);
fvNominalVal.addElement("0");
fvNominalVal.addElement("1");
Attribute ClassAttribute = new Attribute("class", fvNominalVal);
fvWekaAttributes = new FastVector(4);
fvWekaAttributes.addElement(Attribute1);
fvWekaAttributes.addElement(Attribute2);
fvWekaAttributes.addElement(Attribute3);
fvWekaAttributes.addElement(ClassAttribute);
inpInst = new Instances("Rel", fvWekaAttributes, 1);
inpInst.setClassIndex(3);
FileSystem fs;
ObjectInputStream ob = null;
try {
fs = FileSystem.get(new Configuration());
Path svmpath = new Path("/user/root/j48.model");
InputStream ins = fs.open(svmpath);
ob = new ObjectInputStream(ins);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// try {
try {
svm = (LibSVM)ob.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
/* } catch (ClassNotFoundException e) {
e.printStackTrace();
}*/
}
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String text = value.toString();
String[] values = text.split("\t");
int mclusterID = Integer.valueOf(values[0]);
String mfirstName = values[1];
String mmiddleName = values[2];
String mlastName = values[3];
String mlocation = values[4];
String[] mworkArray = new String[values.length - 5];
for(int i=5; i<values.length; i++) {
mworkArray[i-5] = values[i];
}
InstanceData mapInstance = new InstanceData(mfirstName,
mmiddleName, mlastName, mlocation, mworkArray);
FileSystem fs = FileSystem.get(new Configuration());
Path path = new Path(pathOfTestFile); //fstatus.getPath();
FSDataInputStream fis = fs.open(path);
BufferedReader reader = new BufferedReader(new
InputStreamReader(fis));
String line = null;
while ((line = reader.readLine()) != null) {
String[] ele = line.split("\t");
int fclusterID = Integer.valueOf(ele[0]);
if(fclusterID == mclusterID)
continue;
String ffirstName = ele[1];
String fmiddleName = ele[2];
String flastName = ele[3];
String flocation = ele[4];
String[] fworkArray = new String[ele.length - 5];
for(int i=5; i<ele.length; i++) {
fworkArray[i-5] = ele[i];
}
InstanceData fileInstance = new InstanceData(ffirstName,
fmiddleName, flastName, flocation, fworkArray);
double nameDist =
NameDistance.getDistanceBetweenNames(mapInstance, fileInstance);
double locationDist =
LocationDistance.getDistanceBetweenLocations(mapInstance, fileInstance);
double workDistance =
WorkDistance.getDistanceBetweenWorkArr(mapInstance, fileInstance);
try {
fs = FileSystem.get(context.getConfiguration());
} catch (IOException e) {
e.printStackTrace();
}
Attribute Attribute1 = new Attribute("nameDist");
Attribute Attribute2 = new Attribute("locDist");
Attribute Attribute3 = new Attribute("workDist");
FastVector fvNominalVal = new FastVector(2);
fvNominalVal.addElement("0");
fvNominalVal.addElement("1");
Attribute ClassAttribute = new Attribute("class", fvNominalVal);
fvWekaAttributes = new FastVector(4);
fvWekaAttributes.addElement(Attribute1);
fvWekaAttributes.addElement(Attribute2);
fvWekaAttributes.addElement(Attribute3);
fvWekaAttributes.addElement(ClassAttribute);
inpInst = new Instances("Rel", fvWekaAttributes, 1);
inpInst.setClassIndex(3);
Instance inst = new Instance(4);
inst.setValue((Attribute)fvWekaAttributes.elementAt(0),
nameDist);
inst.setValue((Attribute)fvWekaAttributes.elementAt(1),
locationDist);
inst.setValue((Attribute)fvWekaAttributes.elementAt(2),
workDistance);
inst.setValue((Attribute)fvWekaAttributes.elementAt(3),
"0");
inpInst.add(inst);
//try {
try {
svm.classifyInstance(inpInst.instance(0));
} catch (Exception e) {
e.printStackTrace();
}
double prob = SVMDecision.decision_value;
DoubleWritable outputKey = new DoubleWritable();
outputKey.set(prob);
Text outputValue = new
Text(String.valueOf(mclusterID)+"/"+String.valueOf(fclusterID));
context.write(outputKey, outputValue);
/* } catch (Exception
e) {
e.printStackTrace();
}*/
}
//}
}
}
public static class DistanceReducer
extends Reducer<DoubleWritable, Text, Text, DoubleWritable> {
public void reduce(DoubleWritable key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
for (Text val : values) {
context.write(val, key);
}
}
}
public static void distanceCalculator(Path inputDir, Path outputDir) throws
Exception{
conf = new Configuration();
Job job = new Job(conf, "distanceCalculator");
job.setJarByClass(UniqueEntityClustering.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(DistanceMapper.class);
job.setReducerClass(DistanceReducer.class);
job.setMapOutputKeyClass(DoubleWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
System.exit(job.waitForCompletion(true) ? 0: 1);
}
public static void findCluster(Path inputPath, Path outputPath, int ite) throws
IOException, ClassNotFoundException, InterruptedException
{
conf = new Configuration();
Job job = new Job(conf, "Clusterer"+ite);
job.setJarByClass(UniqueEntityClustering.class);
job.setInputFormatClass(TextInputFormat.class);
//ClustererMapper.iterations = ite;
//ClustererMapper.pathOfRefFile = fileName;
job.setMapperClass(ClustererMapper.class);
job.setReducerClass(ClustererReducer.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
System.exit(job.waitForCompletion(true) ? 0: 1);
}
public static void distjob(Path inputPath, Path outputPath, int ite) throws
IOException, ClassNotFoundException, InterruptedException
{
conf = new Configuration();
Job job = new Job(conf, "Dist"+ite);
job.setJarByClass(UniqueEntityClustering.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(DistMapper.class);
job.setReducerClass(DistReducer.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
System.exit(job.waitForCompletion(true) ? 0: 1);
}
public static void main(String[] args) throws Exception {
conf = new Configuration();
//String[] otherArgs = new GenericOptionsParser(conf,
args).getRemainingArgs();
if (args.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
boolean isClusteringTerminated = false;
String job2OutDir = null;
String job2InpDir = null;
String job3InpDir = null;
String job3OutDir = null;
String job1OutDir = null;
String job1InpDir = null;
String job3OutDirBase = null;
String job2OutDirBase = "/user/root/clusterer";
job3OutDirBase = "/user/root/AllClusterDistances";
job1InpDir = "/user/root/TestData";
job1OutDir = "/user/root/AllClusterDistances";
//int iterations = 0;
//String pathOfRefFile = "";
//job2InpDir = "/user/root/OutputDist"+String.valueOf(iterations);
//Start the jobs
distanceCalculator(new Path(job1InpDir), new Path(job1OutDir));
//while(isClusteringTerminated == false) {
//parameters for job 2
if(iterations == 0) {
pathOfRefFile = job1OutDir + "/" + "part-r-00000";
job2InpDir = job1InpDir;
}
else {
pathOfRefFile = job3OutDir+ "/" + "part-r-00000";
job2InpDir = job2OutDirBase + String.valueOf(iterations -
1);
}
job2OutDir = job2OutDirBase + String.valueOf(iterations);
findCluster(new Path(job2InpDir), new Path(job2OutDir),
iterations); //, pathOfRefFile);
//parameters for job 3
job3RefFile = job1OutDir + "/" + "part-r-00000";
job3InpDir = job2OutDir;
job3OutDir = job3OutDirBase + String.valueOf(iterations);
distjob(new Path(job3InpDir), new Path(job3OutDir), iterations);
iterations++;
isClusteringTerminated = determineTermination(new
Path(job3OutDir));
//}
}
public static boolean determineTermination(Path dirPath) {
FileSystem fs = null;
try {
fs = FileSystem.get(new Configuration());
} catch (IOException e) {
e.printStackTrace();
}
FileStatus[] fstatuses = null;
try {
fstatuses = fs.listStatus(dirPath);
} catch (IOException e) {
e.printStackTrace();
}
for (FileStatus fstatus : fstatuses) {
Path path = fstatus.getPath();
if (! path.getName().startsWith("part-r")) {
continue;
}
FSDataInputStream fis = null;
try {
fis = fs.open(path);
} catch (IOException e) {
e.printStackTrace();
}
BufferedReader reader = new BufferedReader(new
InputStreamReader(fis));
String line = null;
try {
line = reader.readLine();
} catch (IOException e) {
e.printStackTrace();
}
String[] values = line.split("=");
double val = Double.valueOf(values[1]);
if(val < 0.0)
return true;
}
return false;
}
}