您好我是使用 Amazon EMR 和 Hadoop 的新手。我想知道如何从 EMR 作业中读取外部文件(存储在 S3 中)。例如,我有一个包含一长串列入黑名单的字符串的文件。当我的 EMR 作业正在处理我的输入时,我如何让作业事先读取此列入黑名单的字符串列表,以便在处理期间使用它?
我尝试使用常规的 Java Scanner 类并对文件的 S3 路径进行硬编码,但这似乎不起作用,尽管我可能做错了......
您好我是使用 Amazon EMR 和 Hadoop 的新手。我想知道如何从 EMR 作业中读取外部文件(存储在 S3 中)。例如,我有一个包含一长串列入黑名单的字符串的文件。当我的 EMR 作业正在处理我的输入时,我如何让作业事先读取此列入黑名单的字符串列表,以便在处理期间使用它?
我尝试使用常规的 Java Scanner 类并对文件的 S3 路径进行硬编码,但这似乎不起作用,尽管我可能做错了......
我会做这样的事情(对不起,代码是scala而不是java,但它是一样的)
将路径作为参数传递给您的 main 方法
将其设置为配置中的属性
val conf = new Configuration()
conf.set("blacklist.file", args(0))
var blacklist: List[String] = List()
override def setup(context: Context) {
val path = new Path(context.getConfiguration.get("blacklist.file"))
val fileSystem = FileSystem.get(path.toUri, context.getConfiguration)
blacklist = scala.io.Source.fromInputStream(fileSystem.open(path)).getLines.toList
}
如果您可以按如下方式将此文件添加到分布式缓存中会更好:
...
String s3FilePath = args[0];
DistributedCache.addCacheFile(new URI(s3FilePath), conf);
...
稍后,在 mapper/reducer 的 configure() 中,您可以执行以下操作:
...
Path s3FilePath;
@Override
public void configure(JobConf job) {
s3FilePath = DistributedCache.getLocalCacheFiles(job)[0];
FileInputStream fstream = new FileInputStream(s3FilePath.toString());
// Read the file and build a HashMap/List or something which can be accessed from map/reduce methods as desired.
...
}