1

我刚刚开始研究基于 hadoop 的开放街道地图数据摄取器。有几种格式 - 但我一直针对基于协议缓冲区的格式(注意 - 它不是纯 pb)。

在我看来,将文件预拆分为序列文件会更有效 - 而不是以自定义记录阅读器/输入格式处理可变长度编码 - 但需要进行完整性检查。

PBF 格式描述中更详细地描述了该格式 但基本上它是 [BlobHeader,Blob] 块的集合。

有一个 Blob 标头

message BlobHeader {
   required string type = 1;
   optional bytes indexdata = 2;
   required int32 datasize = 3;
 }

然后是 Blob(其大小由标头中的 datasize 参数定义)

 message Blob {
   optional bytes raw = 1; // No compression
   optional int32 raw_size = 2; // Only set when compressed, to the uncompressed size
   optional bytes zlib_data = 3;
   // optional bytes lzma_data = 4; // PROPOSED.
   // optional bytes OBSOLETE_bzip2_data = 5; // Deprecated.
 }

一旦你进入blob显然会有更多的结构 - 但我会在映射器中处理它 - 我想做的是最初每个映射器有一个blob(后来可能是每个映射器的一些blob)。

其他一些输入格式/记录读取器使用“足够大”的拆分大小,然后向后/向前搜索到分隔符 - 但由于没有分隔符可以让我知道 blob/headers 的偏移量 - 并且没有索引也指向他们 - 如果不首先通过文件流式传输,我看不到任何方法来获得我的分割点。

现在我不需要从磁盘上实际读取整个文件——我可以从读取标题开始,使用该信息查找 blob,将其设置为第一个分割点,然后重复。但这是我能想到的预分割成序列文件的唯一替代方法。

有没有更好的方法来处理这个 - 或者如果没有,对这两个建议的想法?

4

1 回答 1

4

好吧,我在 getSplits 方法中解析二进制文件 - 因为我跳过了 99% 的数据,所以速度非常快(planet-osm 22GB 世界文件约为 20 秒)。如果其他人偶然发现,这是 getSplits 方法。

@Override
public List<InputSplit> getSplits(JobContext context){
    List<InputSplit> splits = new ArrayList<InputSplit>();
    FileSystem fs = null;
    Path file = OSMPBFInputFormat.getInputPaths(context)[0]; 
    FSDataInputStream in = null;
    try {
        fs = FileSystem.get(context.getConfiguration());
        in = fs.open(file);
        long pos = 0;
        while (in.available() > 0){
            int len = in.readInt(); 
            byte[] blobHeader = new byte[len]; 
            in.read(blobHeader);
            BlobHeader h = BlobHeader.parseFrom(blobHeader);
            FileSplit split = new FileSplit(file, pos,len + h.getDatasize(), new String[] {});
            splits.add(split);
            pos += 4;
            pos += len;
            pos += h.getDatasize();
            in.skip(h.getDatasize());
        }
    } catch (IOException e) {
        sLogger.error(e.getLocalizedMessage());
    } finally {
        if (in != null) {try {in.close();}catch(Exception e){}};
        if (fs != null) {try {fs.close();}catch(Exception e){}};
    }
    return splits;
}

到目前为止工作正常 - 虽然我还没有对输出进行真实的测试。它肯定比将 pbf 复制到 hdfs、在单个映射器中转换为序列、然后摄取(复制时间占主导地位)更快。它也比将外部程序复制到 hdfs 中的序列文件,然后针对 hdfs 运行映射器(后者编写脚本)快约 20%。所以这里没有抱怨。

请注意,这会为每个块生成一个映射器 - 行星世界文件的映射器约为 23k。我实际上是在每次拆分捆绑多个块 - 只需循环 x 次,然后将拆分添加到集合中。

对于 BlobHeader,我刚刚从上面的 OSM wiki 链接编译了 protobuf .proto 文件。如果需要,您还可以从 OSM-binary 类中提取预先生成的 - maven 片段是:

<dependency>
    <groupId>org.openstreetmap.osmosis</groupId>
    <artifactId>osmosis-osm-binary</artifactId>
    <version>0.43-RELEASE</version>
</dependency>
于 2013-11-20T01:41:11.923 回答