如何在映射器中获取输入文件的名称?我在输入目录中存储了多个输入文件,每个映射器可能读取不同的文件,我需要知道映射器读取了哪个文件。
11 回答
First you need to get the input split, using the newer mapreduce API it would be done as follows:
context.getInputSplit();
But in order to get the file path and the file name you will need to first typecast the result into FileSplit.
So, in order to get the input file path you may do the following:
Path filePath = ((FileSplit) context.getInputSplit()).getPath();
String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();
Similarly, to get the file name, you may just call upon getName(), like this:
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
在您的映射器中使用它:
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String filename = fileSplit.getPath().getName();
编辑 :
如果您想通过旧 API在configure()中执行此操作,请尝试以下操作:
String fileName = new String();
public void configure(JobConf job)
{
filename = job.get("map.input.file");
}
如果您使用的是Hadoop Streaming,您可以在流式作业的映射器/缩减器中使用 JobConf 变量。
至于映射器的输入文件名,请参阅配置参数部分,map.input.file
变量(映射正在读取的文件名)是可以完成工作的变量。但请注意:
注意:在流式作业执行期间,“mapred”参数的名称会被转换。点 (.) 变为下划线 (_)。例如,mapred.job.id 变为 mapred_job_id,mapred.jar 变为 mapred_jar。要获取流式作业的映射器/缩减器中的值,请使用带下划线的参数名称。
例如,如果您使用的是 Python,那么您可以将这一行放在您的映射器文件中:
import os
file_name = os.getenv('map_input_file')
print file_name
如果您使用的是常规 InputFormat,请在 Mapper 中使用它:
InputSplit is = context.getInputSplit();
Method method = is.getClass().getMethod("getInputSplit");
method.setAccessible(true);
FileSplit fileSplit = (FileSplit) method.invoke(is);
String currentFileName = fileSplit.getPath().getName()
如果您使用的是 CombineFileInputFormat,这是一种不同的方法,因为它将几个小文件组合成一个相对较大的文件(取决于您的配置)。Mapper 和 RecordReader 都在同一个 JVM 上运行,因此您可以在运行时在它们之间传递数据。您需要实现自己的 CombineFileRecordReaderWrapper 并执行以下操作:
public class MyCombineFileRecordReaderWrapper<K, V> extends RecordReader<K, V>{
...
private static String mCurrentFilePath;
...
public void initialize(InputSplit combineSplit , TaskAttemptContext context) throws IOException, InterruptedException {
assert this.fileSplitIsValid(context);
mCurrentFilePath = mFileSplit.getPath().toString();
this.mDelegate.initialize(this.mFileSplit, context);
}
...
public static String getCurrentFilePath() {
return mCurrentFilePath;
}
...
然后,在您的 Mapper 中,使用以下命令:
String currentFileName = MyCombineFileRecordReaderWrapper.getCurrentFilePath()
希望我有所帮助:-)
在使用旧api的 Hadoop 2.4 及更高版本上注意到此方法会产生空值
String fileName = new String();
public void configure(JobConf job)
{
fileName = job.get("map.input.file");
}
或者,您可以利用传递给 map 函数的 Reporter 对象来获取 InputSplit 并转换为 FileSplit 以检索文件名
public void map(LongWritable offset, Text record,
OutputCollector<NullWritable, Text> out, Reporter rptr)
throws IOException {
FileSplit fsplit = (FileSplit) rptr.getInputSplit();
String inputFileName = fsplit.getPath().getName();
....
}
您必须首先通过类型转换转换为 InputSplit,然后您需要将类型转换为 FileSplit。
例子:
InputSplit inputSplit= (InputSplit)context.getInputSplit();
Path filePath = ((FileSplit) inputSplit).getPath();
String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString()
主张转换为的答案FileSplit
将不再有效,因为FileSplit
不再为多个输入返回实例(因此您将获得 a ClassCastException
)。相反,org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit
会返回实例。不幸的是,TaggedInputSplit
不使用反射就无法访问该类。所以这是我为此编写的一个实用程序类。做就是了:
Path path = MapperUtils.getPath(context.getInputSplit());
在你的Mapper.setup(Context context)
方法中。
这是我的MapperUtils
课程的源代码:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.Method;
import java.util.Optional;
public class MapperUtils {
public static Path getPath(InputSplit split) {
return getFileSplit(split).map(FileSplit::getPath).orElseThrow(() ->
new AssertionError("cannot find path from split " + split.getClass()));
}
public static Optional<FileSplit> getFileSplit(InputSplit split) {
if (split instanceof FileSplit) {
return Optional.of((FileSplit)split);
} else if (TaggedInputSplit.clazz.isInstance(split)) {
return getFileSplit(TaggedInputSplit.getInputSplit(split));
} else {
return Optional.empty();
}
}
private static final class TaggedInputSplit {
private static final Class<?> clazz;
private static final MethodHandle method;
static {
try {
clazz = Class.forName("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit");
Method m = clazz.getDeclaredMethod("getInputSplit");
m.setAccessible(true);
method = MethodHandles.lookup().unreflect(m).asType(
MethodType.methodType(InputSplit.class, InputSplit.class));
} catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}
static InputSplit getInputSplit(InputSplit o) {
try {
return (InputSplit) method.invokeExact(o);
} catch (Throwable e) {
throw new AssertionError(e);
}
}
}
private MapperUtils() { }
}
这帮助了我:
String fileName = ((org.apache.hadoop.mapreduce.lib.input.FileSplit) context.getInputSplit()).getPath().getName();
对于org.apache.hadood.mapred
包,地图功能签名应为:
map(Object, Object, OutputCollector, Reporter)
因此,要在 map 函数中获取文件名,您可以像这样使用 Reporter 对象:
String fileName = ((FileSplit) reporter.getInputSplit()).getPath().getName();
package com.foo.bar;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.Method;
public class MapperUtils {
public static Path getPath(InputSplit split) {
FileSplit fileSplit = getFileSplit(split);
if (fileSplit == null) {
throw new AssertionError("cannot find path from split " + split.getClass());
} else {
return fileSplit.getPath();
}
}
public static FileSplit getFileSplit(InputSplit split) {
if (split instanceof FileSplit) {
return (FileSplit)split;
} else if (TaggedInputSplit.clazz.isInstance(split)) {
return getFileSplit(TaggedInputSplit.getInputSplit(split));
} else {
return null;
}
}
private static final class TaggedInputSplit {
private static final Class<?> clazz;
private static final MethodHandle method;
static {
try {
clazz = Class.forName("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit");
Method m = clazz.getDeclaredMethod("getInputSplit");
m.setAccessible(true);
method = MethodHandles.lookup().unreflect(m).asType(
MethodType.methodType(InputSplit.class, InputSplit.class));
} catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}
static InputSplit getInputSplit(InputSplit o) {
try {
return (InputSplit) method.invokeExact(o);
} catch (Throwable e) {
throw new AssertionError(e);
}
}
}
private MapperUtils() { }
}
我重写了 hans-brende 在 Java 7 中提供的代码,它起作用了。但是有一个问题是
文件输入格式计数器 Bytes Read=0 Bytes Read 如果使用 MultipleInputs,则为零。
像这样的多个输入:
-Dwordcount.case.sensitive=false
hdfs://192.168.178.22:9000/user/hduser/inWiki
hdfs://192.168.178.22:9000/user/hduser/outWiki1
hdfs://192.168.178.22:9000/user/joe/wordcount/dict/dictionary.txt
-skip hdfs://192.168.178.22:9000/user/joe/wordcount/patterns.txt
对于该文件dictionary.txt
,我在 Map Code 中编写了一个程序