0

我是hadoop的新手,我正在努力获得员工的最高薪水。我的数据看起来像

    1231,"","","",4000<br/>
    1232,"","","",5000<br/>
    ..................<br/>
    ..................<br/>

这是我的映射器类,在这里,我要尝试发出完整的元组

public class maxmap extends MapReduceBase implements Mapper<LongWritable,Text,Text,employee> {


    @Override
    public void map(LongWritable key, Text value,
            OutputCollector<Text,employee> outputCollector, Reporter reporter)
            throws IOException {
        // TODO Auto-generated method stub
        //TupleWritable tupleWritable= new TupleWritable(new Writable[]{new Text(value.toString().split(",")[1]),
                //new Text(value.toString().split(",")[0])
        //});Str

        String employeeId = value.toString().split(",")[0];

        int count =1231;
        employee employee=new employee(Integer.parseInt(employeeId), "employeeName", "StemployeeDept", "employeeJoinDt",1231);
        //tupleWritable.write();
        outputCollector.collect(new Text("max salry"),employee);

    }
}

这是我的减速器课程

public class maxsalreduce extends MapReduceBase implements Reducer<Text,employee,Text,IntWritable> {

    @Override
    public void reduce(Text key, Iterator<employee> values,
            OutputCollector<Text, IntWritable> collector, Reporter reporter)
            throws IOException {
        // TODO Auto-generated method stub
        System.out.println("in reducer");
        while(values.hasNext()){
            employee employee=values.next();
            System.out.println("employee id"+employee.employeeId);

        }
    collector.collect(new Text(""), new IntWritable(1));

    }
}

这是我的员工班

public class employee  implements Writable{

    public int employeeId;
    private String employeeName;
    private String employeeDept;
    private String employeeJoinDt;
    public employee(int employeeId,String employeeName,String employeeDept,String employeeJoinDt,int employeeSalary){
        this.employeeId=employeeId;
        System.out.println(this.employeeId);
        this.employeeName=employeeName;
        this.employeeDept=employeeDept;
        this.employeeJoinDt=employeeJoinDt;
        this.employeeSalary=employeeSalary;
    }
    public employee() {
        // TODO Auto-generated constructor stub
    }
    public int getEmployeeId() {
        return employeeId;
    }

    public void setEmployeeId(int employeeId) {
        this.employeeId = employeeId;
    }

    public String getEmployeeName() {
        return employeeName;
    }

    public void setEmployeeName(String employeeName) {
        this.employeeName = employeeName;
    }

    public String getEmployeeDept() {
        return employeeDept;
    }

    public void setEmployeeDept(String employeeDept) {
        this.employeeDept = employeeDept;
    }

    public String getEmployeeJoinDt() {
        return employeeJoinDt;
    }

    public void setEmployeeJoinDt(String employeeJoinDt) {
        this.employeeJoinDt = employeeJoinDt;
    }

    public int getEmployeeSalary() {
        return employeeSalary;
    }

    public void setEmployeeSalary(int employeeSalary) {
        this.employeeSalary = employeeSalary;
    }

    private int employeeSalary;
    @Override
    public void readFields(DataInput input) throws IOException {
        // TODO Auto-generated method stubt
        System.out.println("employee id is"+input.readInt());
        //this.employeeId=input.readInt();
        //this.employeeName=input.readUTF();
        //this.employeeDept=input.readUTF();
        //this.employeeJoinDt=input.readUTF();mployee id
        //this.employeeSalary=input.readInt();
        new employee(input.readInt(),input.readUTF(),input.readUTF(),input.readUTF(),input.readInt());
    }

    @Override
    public void write(DataOutput output) throws IOException {
        // TODO Auto-generated method stub
        output.writeInt(this.employeeId);
        output.writeUTF(this.employeeName);
        output.writeUTF(this.employeeDept);
        output.writeUTF(this.employeeJoinDt);
        output.writeInt(this.employeeSalary);

    }
}

这是我的工作跑步者

public class jobrunner {

    public static void main(String[] args) throws IOException
    {


        JobConf jobConf = new JobConf(jobrunner.class);
        jobConf.setJobName("Count no of employees");
        jobConf.setMapperClass(maxmap.class);
        jobConf.setReducerClass(maxsalreduce.class);

        FileInputFormat.setInputPaths(jobConf, new Path("hdfs://localhost:9000/employee_data.txt"));
        FileOutputFormat.setOutputPath(jobConf,new Path("hdfs://localhost:9000/dummy20.txt"));
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(employee.class);
        JobClient.runJob(jobConf);
    }
}

这是我得到的例外

java.lang.RuntimeException: problem advancing post rec#0
    at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1214)
    at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:249)
    at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:245)
    at tuplewritable.maxsalreduce.reduce(maxsalreduce.java:24)
    at tuplewritable.maxsalreduce.reduce(maxsalreduce.java:1)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:260)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)
    at java.io.DataInputStream.readUTF(DataInputStream.java:592)
    at java.io.DataInputStream.readUTF(DataInputStream.java:547)
    at tuplewritable.employee.readFields(employee.java:76)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
    at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1271)
    at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1211)
    ... 7 more
13/08/17 20:44:14 INFO mapred.JobClient:  map 100% reduce 0%
13/08/17 20:44:14 INFO mapred.JobClient: Job complete: job_local_0001
13/08/17 20:44:14 INFO mapred.JobClient: Counters: 21
13/08/17 20:44:14 INFO mapred.JobClient:   File Input Format Counters 
13/08/17 20:44:14 INFO mapred.JobClient:     Bytes Read=123
13/08/17 20:44:14 INFO mapred.JobClient:   FileSystemCounters
13/08/17 20:44:14 INFO mapred.JobClient:     FILE_BYTES_READ=146
13/08/17 20:44:14 INFO mapred.JobClient:     HDFS_BYTES_READ=123
13/08/17 20:44:14 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=39985
13/08/17 20:44:14 INFO mapred.JobClient:   Map-Reduce Framework
13/08/17 20:44:14 INFO mapred.JobClient:     Map output materialized bytes=270
13/08/17 20:44:14 INFO mapred.JobClient:     Map input records=4
13/08/17 20:44:14 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/08/17 20:44:14 INFO mapred.JobClient:     Spilled Records=4
13/08/17 20:44:14 INFO mapred.JobClient:     Map output bytes=256
13/08/17 20:44:14 INFO mapred.JobClient:     Total committed heap usage (bytes)=160763904
13/08/17 20:44:14 INFO mapred.JobClient:     CPU time spent (ms)=0
13/08/17 20:44:14 INFO mapred.JobClient:     Map input bytes=123
13/08/17 20:44:14 INFO mapred.JobClient:     SPLIT_RAW_BYTES=92
13/08/17 20:44:14 INFO mapred.JobClient:     Combine input records=0
13/08/17 20:44:14 INFO mapred.JobClient:     Reduce input records=0
13/08/17 20:44:14 INFO mapred.JobClient:     Reduce input groups=0
13/08/17 20:44:14 INFO mapred.JobClient:     Combine output records=0
13/08/17 20:44:14 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/08/17 20:44:14 INFO mapred.JobClient:     Reduce output records=0
13/08/17 20:44:14 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/08/17 20:44:14 INFO mapred.JobClient:     Map output records=4
13/08/17 20:44:14 INFO mapred.JobClient: Job Failed: NA
Exception in thread "main" java.io.IOException: Job failed!
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1265)
    at tuplewritable.jobrunner.main(jobrunner.java:30)
13/08/17 20:44:14 ERROR hdfs.DFSClient: Exception closing file /dummy20.txt/_temporary/_attempt_local_0001_r_000000_0/part-00000 : org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on /dummy20.txt/_temporary/_attempt_local_0001_r_000000_0/part-00000 File does not exist. Holder DFSClient_1595916561 does not have any open files.
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1629)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1620)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:1675)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:1663)
    at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:718)
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1083)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)

org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on /dummy20.txt/_temporary/_attempt_local_0001_r_000000_0/part-00000 File does not exist. Holder DFSClient_1595916561 does not have any open files.
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1629)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1620)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:1675)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:1663)
    at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:718)
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1083)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)

    at org.apache.hadoop.ipc.Client.call(Client.java:1066)
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
    at $Proxy1.complete(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
    at $Proxy1.complete(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:3894)
    at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:3809)
    at org.apache.hadoop.hdfs.DFSClient$LeaseChecker.close(DFSClient.java:1342)
    at org.apache.hadoop.hdfs.DFSClient.close(DFSClient.java:275)
    at org.apache.hadoop.hdfs.DistributedFileSystem.close(DistributedFileSystem.java:328)
    at org.apache.hadoop.fs.FileSystem$Cache.closeAll(FileSystem.java:1446)
    at org.apache.hadoop.fs.FileSystem.closeAll(FileSystem.java:277)
    at org.apache.hadoop.fs.FileSystem$ClientFinalizer.run(FileSystem.java:260)
4

1 回答 1

0

你在课堂上遇到了这些问题。

Employee 类中删除

System.out.println("employee id is"+input.readInt());

和,

   new employee(input.readInt(),input.readUTF(),input.readUTF(),
   input.readUTF(),input.readInt());

从,

@Override
    public void readFields(DataInput input) throws IOException {
        // TODO Auto-generated method stubt
        System.out.println("employee id is"+input.readInt());
        //this.employeeId=input.readInt();
        //this.employeeName=input.readUTF();
        //this.employeeDept=input.readUTF();
        //this.employeeJoinDt=input.readUTF();mployee id
        //this.employeeSalary=input.readInt();
        new employee(input.readInt(),input.readUTF(),input.readUTF(),input.readUTF(),input.readInt());
    }

原因:已经System.out.println("employee id is"+input.readInt()); 反序列化了您的第一个输入,这就是为什么input.readInt()再次使用会导致问题。而另一行new Employee(....),你可能很清楚不要像这样使用它。至少我不这样做。

接下来在JobRunner 类中,

删除这一行:

jobConf.setOutputValueClass(employee.class);

添加这些行,

jobConf.setMapOutputKeyClass(Text.class);
jobConf.setMapOutputValueClass(employee.class);
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);

附录:请使用大写字母开始一个类名。如果您不是,它会破坏Java 命名约定。

于 2013-08-17T07:43:21.603 回答