我在 HDFS 中有 1000 多个文件可用,命名约定为1_fileName.txt
to N_fileName.txt
。每个文件的大小为 1024 MB。我需要将这些文件合并为一个(HDFS)并保持文件的顺序。说5_FileName.txt
应该只在之后附加4_fileName.txt
执行此操作的最佳和最快方法是什么。
有没有什么方法可以在不复制数据节点之间的实际数据的情况下执行这种合并?例如:获取此文件的块位置并在名称节点中使用这些块位置创建一个新条目(文件名)?
没有有效的方法可以做到这一点,您需要将所有数据移动到一个节点,然后再返回 HDFS。
执行此操作的命令行 scriptlet 可能如下所示:
hadoop fs -text *_fileName.txt | hadoop fs -put - targetFilename.txt
这会将与 glob 匹配的所有文件分类到标准输出,然后将该流通过管道传输到 put 命令并将流输出到名为 targetFilename.txt 的 HDFS 文件
你唯一的问题是你所使用的文件名结构 - 如果你有固定的宽度,零填充数字部分会更容易,但在当前状态下你会得到一个意想不到的字典顺序(1、10、100、1000 , 11, 110 等)而不是数字顺序(1,2,3,4 等)。您可以通过将 scriptlet 修改为:
hadoop fs -text [0-9]_fileName.txt [0-9][0-9]_fileName.txt \
[0-9][0-9[0-9]_fileName.txt | hadoop fs -put - targetFilename.txt
有一个 API 方法org.apache.hadoop.fs.FileUtil.copyMerge可以执行此操作:
public static boolean copyMerge(
FileSystem srcFS,
Path srcDir,
FileSystem dstFS,
Path dstFile,
boolean deleteSource,
Configuration conf,
String addString)
srcDir
它按字母顺序读取所有文件并将其内容附加到 dstFile。
如果你可以使用火花。它可以像
sc.textFile("hdfs://...../part*).coalesce(1).saveAsTextFile("hdfs://...../filename)
希望这可行,因为 spark 以分布式方式工作,您不必将文件复制到一个节点中。虽然只是一个警告,但如果文件非常大,在 spark 中合并文件可能会很慢。
由于文件顺序很重要,而字典顺序并不能达到目的,因此为这个任务编写一个映射器程序似乎是一个不错的选择,它可能会定期运行。当然没有reducer,把它写成一个HDFS map任务是高效的,因为它可以将这些文件合并到一个输出文件中,而无需在数据节点之间移动太多数据。由于源文件在 HDFS 中,并且由于映射器任务将尝试数据关联,它可以合并文件而无需跨不同数据节点移动文件。
映射程序将需要一个自定义 InputSplit(在输入目录中获取文件名并根据需要对其进行排序)和一个自定义 InputFormat。
映射器可以使用 hdfs append 或原始输出流,它可以在 byte[] 中写入。
我正在考虑的 Mapper 程序的粗略草图类似于:
public class MergeOrderedFileMapper extends MapReduceBase implements Mapper<ArrayWritable, Text, ??, ??>
{
FileSystem fs;
public void map(ArrayWritable sourceFiles, Text destFile, OutputCollector<??, ??> output, Reporter reporter) throws IOException
{
//Convert the destFile to Path.
...
//make sure the parent directory of destFile is created first.
FSDataOutputStream destOS = fs.append(destFilePath);
//Convert the sourceFiles to Paths.
List<Path> srcPaths;
....
....
for(Path p: sourcePaths) {
FSDataInputStream srcIS = fs.open(p);
byte[] fileContent
srcIS.read(fileContent);
destOS.write(fileContent);
srcIS.close();
reporter.progress(); // Important, else mapper taks may timeout.
}
destOS.close();
// Delete source files.
for(Path p: sourcePaths) {
fs.delete(p, false);
reporter.progress();
}
}
}
我为 PySpark 编写了一个实现,因为我们经常使用它。
以 Hadoop 为模型copyMerge()
并使用相同的较低级别的 Hadoop API 来实现这一点。
https://github.com/Tagar/abalon/blob/v2.3.3/abalon/spark/sparkutils.py#L335
它保持文件名的字母顺序。