1

我对 hadoop 相当陌生,我在 5 节点集群上运行多个 mapReduce 作业。运行多个线程时,我已经开始出现“文件系统关闭”异常。一次运行一个作业时工作正常。错误出现在映射之后,就在减少之前。它看起来像这样:

java.lang.Exception: java.io.IOException: Filesystem closed
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:399)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:552)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:648)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:167)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:526)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:338)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

这不会一直发生,如果我重新执行失败的作业,它会运行良好。不幸的是,这会占用太多时间。我假设这与访问同一个输入文件的多个任务有关,当一个任务完成时,它会关闭所有任务的输入文件。如果这是我想知道的问题是如何覆盖它。我尝试在映射器中覆盖清理以重新打开路径,但这似乎很愚蠢并且不起作用。

@Override 
public void cleanup(Context context){
        Job tempJob;
        try {
            tempJob = new Job();
            Path fs = ((FileSplit) context.getInputSplit()).getPath();
            FileInputFormat.addInputPath(tempJob, fs);
            System.out.println("Finished map task for " + context.getJobName());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

我还想知道这是否是使用线程池执行 hadoop mapReduce 作业的基本问题。感谢您的任何想法。

编辑:当我提到工作和任务时,我可能有点不清楚。我实际上正在使用他们自己的映射器和减速器运行多个作业。这些作业中的每一个都会为我正在创建的特定表生成一列。说一个总和或一个计数。每个作业都有自己的线程,它们都在访问同一个输入文件。我遇到的问题是,当一些工作完成时,他们会抛出“文件系统关闭异常”。如果这可能会有所作为,我也在使用 Yarn。

4

1 回答 1

1

作为一般规则,除非您有一个 CPU 密集型工作,否则我不建议在同一任务中使用多个线程,这会增加 JVM 出现问题的可能性,并且重新运行任务的成本会高得多。您可能应该考虑增加映射任务的数量,当然每个任务都将在单独的 JVM 中运行,但这样会更干净。

如果您真的想采用多线程方式,那么我怀疑您使用了错误类型的映射器,对于多线程应用程序,您应该使用 aMultithreadedMapper具有不同的run方法实现并且应该是线程安全的。你可以像这样使用它:

job.setMapperClass(MultithreadedMapper.class);

您可以像这样指定线程数:

int numThreads = 42;
MultithreadedMapper.setNumberOfThreads(numThreads);
于 2013-03-13T01:51:58.220 回答