2

我的工作流程如下:

我正在处理大量数据。我有一个MapFile需要缓存的。这个文件的大小现在是 1 GB,但我希望它最终会增长。

MapFile 的内容是这样的:

12345,45464       192.34.23.1
33214,45321       123.45.32.1
  • 在 中map-phase,我处理来自输入文件的每条记录TextInputFormat。我解析该行(由标记分割)并检索前两个标记,token1 和 token2。

如果 (token1,token2) 对不在缓存文件中,那么我调用 API,获取信息,保存在缓存中(如果可能)并继续处理。

 private Parser parser = new customParser();

protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

      parser.parse(value);
      Pair pair = new Pair();
      pair.setFirst(parser.getFirst());
      pair.setSecond(parser.getSecond());
      IP ip = null;

      //here is the catch
      //check if pair exists in cache
      if cache.contains(pair){
          ip=cache.get(pair);
       }
       else {
          ip=getFromAPI(pair);//This does API call outside network.
          cache.put(pair,ip);
       }
      context.write(pair,ip);
      }
    }

我在这里看到的主要问题是

  1. 如何在所有节点的缓存中获取大文件。DistributedCache 通过将文件复制到本地节点来工作。但由于这个文件更大,这里涉及到网络流量,对于我的日常工作,我不想继续分发它。

  2. 如何高效查找 MapFile(cache),整个 mapFile 不会在内存中。

  3. 如何写入作为我的缓存的 MapFile。

谢谢

4

1 回答 1

0

正如我所看到的,有三种方法可以处理这个问题,最好的一种取决于你的缓存文件将如何增长。

  1. 如果您不希望缓存文件增长太多,并且它始终可以放入内存而不妨碍其他应用程序或 MapReduce 作业,您可以将其放入HDFS 缓存中。自 Hadoop 2.3.0 起支持此功能:

    HDFS 缓存允许用户在 HDFS 中显式缓存某些文件或目录。然后,DataNodes 将通过使用 mmap 和 mlock 将相应的块缓存在堆外内存中。缓存后,Hadoop 应用程序可以查询缓存块的位置并将其任务放置在内存局部性中。最后,当内存本地时,应用程序可以使用新的零拷贝读取 API 来读取缓存数据,而无需额外开销。

如果随着缓存文件的增长而无法安全地将其保存在内存中,则最后两个选项更合适:

  1. Thomas Jungblut 的这个答案建议将您的缓存文件放入 HDFS,增加复制计数并使用FileSystem API读取它。这仍然会导致非本地副本的网络通信,但希望少于到 DistributedCache 中所有节点的传输。FileSystem API 还允许您附加到现有文件,让您更新文件。

  2. 如果您的缓存文件将增长得如此之多,以至于您在存储额外的复制时可能会遇到问题,那么您可能希望考虑让它作为第一个映射步骤的一部分进行检索。

    例如,您可以将缓存文件和要处理的文件作为映射器的输入,并为这两个输入映射令牌对。在 reduce 步骤中,如果令牌对在缓存文件和已处理文件中都有一行,则不输出任何内容,并在其他两种可能的情况下输出相应的缓存行,从而构建新的缓存文件。

于 2014-11-02T13:43:33.633 回答