6

当我在小数据上运行代码时,我没有收到以下错误。但是当我在更大的数据集上运行相同的代码时,在使用多个输出时出现以下错误。请帮忙!

org.apache.hadoop.ipc.RemoteException: 
org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: failed to create file
/home/users/mlakshm/alop176/data-r-00001 for
DFSClient_attempt_201208010142_0043_r_000001_1 on client 10.0.1.100, because this file
is already being created by DFSClient_attempt_201208010142_0043_r_000001_0 on     10.0.1.130 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:1406)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1246)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1188)
    at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:628)
    at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)

    at org.apache.hadoop.ipc.Client.call(Client.java:1070)
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
    at $Proxy2.create(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:616)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
    at $Proxy2.create(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.<init>(DFSClient.java:3248)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:713)
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:182)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:555)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:455)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:118)
    at com.a.MultipleOutputs$InternalFileOutputFormat.getRecordWriter(MultipleOutputs.java:565)
    at com.a.MultipleOutputs.getRecordWriter(MultipleOutputs.java:432)
    at com.a.MultipleOutputs.getCollector(MultipleOutputs.java:518)
    at com.a.MultipleOutputs.getCollector(MultipleOutputs.java:482)
    at com.a.ReduceThree1.reduce(ReduceThree1.java:56)
    at com.a.ReduceThree1.reduce(ReduceThree1.java:1)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)

在 org.apache.hadoop.mapred.Child.main(Child.java:249)


reduce类如下:

public class ReduceThree1 extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
        //  @SuppressWarnings("unchecked")
        private MultipleOutputs mos;

         public void configure(JobConf conf1) {

         mos = new MultipleOutputs(conf1);

         }

            public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {


                // MultipleOutputs mos;
                 int sum = 0;
                 ArrayList<CustomMapI> alcmap = new ArrayList<CustomMapI>();
                while(values.hasNext())
                {

                    String val = values.next().toString();
                    StringTokenizer st = new StringTokenizer(val);
                    String uid = st.nextToken();
                    String f_val = st.nextToken();
                    CustomMapI cmap = new CustomMapI(uid, f_val);
                    alcmap.add(cmap);
                    sum += Integer.parseInt(f_val);

                }

                StringTokenizer st = new StringTokenizer(key.toString());
                String t = st.nextToken();
                String data = st.nextToken();

                for(int i = 0; i<alcmap.size(); i++)
                {

                    String str_key = t+" "+alcmap.get(i).getUid();
                    String str_val = data+" "+alcmap.get(i).getF_val()+" "+sum;

                 //   output.collect(new Text(str_key), new Text(str_val));
                   mos.getCollector("/home/users/mlakshm/alop176/data", reporter).collect(new Text(str_key), new Text(str_val));

                   for(int j = 1; j<alcmap.size(); j++)
                   {
                       if((j>i)&&(!alcmap.get(i).equals(alcmap.get(j))))
                       {
                           String mul_key = "null";


                           String uidi = alcmap.get(i).getUid();
                           String uidj = alcmap.get(j).getUid();


                          ArrayList<String> alsort = new  ArrayList<String>();
                          alsort.add(uidi);
                          alsort.add(uidj);
                          Collections.sort(alsort);
                          int fi = Integer.parseInt(alcmap.get(i).getF_val());

                          int fj = Integer.parseInt(alcmap.get(j).getF_val());
                          String intersection = "null";
                          if(fi<fj)
                          {
                             intersection = String.valueOf(fi);
                          }
                          else
                          {
                              intersection = String.valueOf(fj);
                          }

                          String mul_val = t+" "+alsort.get(0)+" "+alsort.get(1)+" "+intersection;
                        //   System.out.println(mul_key+ " "+mul_val);

                           mos.getCollector("/home/users/mlakshm/alop177/datepairs", reporter).collect(new Text(mul_key), new Text(mul_val));
                       }
                   }

                }


             }

            public void close() throws IOException {
                 mos.close();

                 }
}

Job Conf 如下:

配置 config1 = 新配置();

          JobConf conf1 = new JobConf(config1, DJob.class);

          conf1.setJobName("DJob1");
          conf1.setOutputKeyClass(Text.class);
          conf1.setOutputValueClass(Text.class);
         // conf.setMapOutputValueClass(Text.class);
        //  conf.setMapOutputKeyClass(Text.class);
         // conf.setNumMapTasks(20);
          conf.setNumReduceTasks(10);
          conf1.setMapperClass(MapThree1.class);
         // conf.setCombinerClass(Combiner.class);
          conf1.setReducerClass(ReduceThree1.class);
          conf1.setPartitionerClass(CustomPartitioner.class);

          conf1.setInputFormat(TextInputFormat.class);
          conf1.setOutputFormat(TextOutputFormat.class);
       //   mos = new MultipleOutputs(conf1);
          MultipleOutputs.addNamedOutput(conf1, "/home/users/mlakshm/alop176/data", TextOutputFormat.class, LongWritable.class, Text.class);
          MultipleOutputs.addNamedOutput(conf1, "/home/users/mlakshm/alop177/datepairs", TextOutputFormat.class, LongWritable.class, Text.class);


          FileInputFormat.setInputPaths(conf1, new Path(other_args.get(2)));
          FileOutputFormat.setOutputPath(conf1, new Path(other_args.get(3)));

         JobClient.runJob(conf1);
4

1 回答 1

3

您很可能已经开启了推测执行,并且 reduce 任务 1 的两种不同尝试正在尝试写入 path /home/users/mlakshm/alop176/data-r-00001。对于较小的任务,这可能会成功,因为它们在 hadoop 推测性地执行第二次尝试之前完成。

我看到您的 MultipleOutputs 实现是自定义的(com.a.MultipleOutputs),您应该将所有 HDFS 数据写入任务工作目录,并让 OutputComitter 在输出提交时将其移动到最终输出目录。如果可以的话,粘贴代码,我们可以看看。

于 2012-08-03T10:39:39.103 回答