0

我想使用一个 HashSet 在映射时对一个文件存在/起作用,然后在映射下一个文件时重置/重新创建。我已修改 TextInputFormat 以覆盖 isSplitable 以返回 false,这样文件就不会被拆分,而是由 Mappers 整体处理。有可能做这样的事情吗?还是有另一种方法可以减少对 Accumulo 表的写入?

让我从我不相信我想要一个全局变量开始。我只想确保唯一性,从而在我的 Accumulo 表中写入更少的突变。

我的项目是将分片示例中的 Index.java 文件的功能从线性 accumulo 客户端程序转换为使用 mapreduce 功能的程序,同时仍然在 Accumulo 中创建相同的表。它需要 mapreduce,因为这是流行语,本质上它比针对 TB 数据的线性程序运行得更快。

以下是索引代码供参考: http: //grepcode.com/file/repo1.maven.org/maven2/org.apache.accumulo/examples-simple/1.4.0/org/apache/accumulo/examples/simple/分片/Index.java

该程序使用 BatchWriter 将 Mutations 写入 Accumulo 并在每个文件的基础上进行。为了确保它不会写入不必要的突变并确保唯一性(尽管我确实相信 Accumulo 最终会通过压缩合并相同的键),Index.java 有一个 HashSet 用于确定之前是否已经运行过一个单词。这一切都比较容易理解。

转移到仅 map 的 mapreduce 作业更加复杂。

这是我的映射尝试,从我看到的 Accumulo 表的部分输出来看,这似乎有点工作,但与线性程序 Index.java 相比,它的运行速度真的很慢

public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
        private HashSet<String> tokensSeen = new HashSet<String>();
        @Override
        public void map(LongWritable key, Text value, Context output) throws IOException {
            FileSplit fileSplit = (FileSplit)output.getInputSplit();
            System.out.println("FilePath " + fileSplit.getPath().toString());
            String filePath = fileSplit.getPath().toString();
            filePath = filePath.replace("unprocessed", "processed");

            String[] words = value.toString().split("\\W+");

            for (String word : words) {
                Mutation mutation = new Mutation(genPartition(filePath.hashCode() % 10));
                word = word.toLowerCase();
                if(!tokensSeen.contains(word)) {
                    tokensSeen.add(word);
                    mutation.put(new Text(word), new Text(filePath), new Value(new byte[0]));
                }

                try {
                    output.write(null, mutation);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

缓慢的问题可能是我在一个测试实例上运行所有这些,这是一个带有 ZooKeeper 和 Accumulo 的单节点 Hadoop 实例。如果是这样,我只需要找到唯一性的解决方案。

非常感谢提供的任何帮助或建议。

4

1 回答 1

1

Mapper 具有您可以覆盖setupcleanup方法以更干净地处理此类事情。setup被调用一次,然后map被调用多次(每条记录一次),然后cleanup在最后被调用一次。这个想法是您在setup方法中创建 HashSet,在 中构建它map,然后在 中提交所有内容,或者在必要时cleanup定期刷新一些调用。map

但是,在迁移到真正的集群之前,您几乎肯定不会看到运行时的任何改进。与简单的线性程序相比,单节点测试实例几乎没有任何优势,只是在获得真正的 hadoop 集群后,相同的代码将运行得更快。

于 2013-09-25T22:24:24.180 回答