我想使用一个 HashSet 在映射时对一个文件存在/起作用,然后在映射下一个文件时重置/重新创建。我已修改 TextInputFormat 以覆盖 isSplitable 以返回 false,这样文件就不会被拆分,而是由 Mappers 整体处理。有可能做这样的事情吗?还是有另一种方法可以减少对 Accumulo 表的写入?
让我从我不相信我想要一个全局变量开始。我只想确保唯一性,从而在我的 Accumulo 表中写入更少的突变。
我的项目是将分片示例中的 Index.java 文件的功能从线性 accumulo 客户端程序转换为使用 mapreduce 功能的程序,同时仍然在 Accumulo 中创建相同的表。它需要 mapreduce,因为这是流行语,本质上它比针对 TB 数据的线性程序运行得更快。
以下是索引代码供参考: http: //grepcode.com/file/repo1.maven.org/maven2/org.apache.accumulo/examples-simple/1.4.0/org/apache/accumulo/examples/simple/分片/Index.java
该程序使用 BatchWriter 将 Mutations 写入 Accumulo 并在每个文件的基础上进行。为了确保它不会写入不必要的突变并确保唯一性(尽管我确实相信 Accumulo 最终会通过压缩合并相同的键),Index.java 有一个 HashSet 用于确定之前是否已经运行过一个单词。这一切都比较容易理解。
转移到仅 map 的 mapreduce 作业更加复杂。
这是我的映射尝试,从我看到的 Accumulo 表的部分输出来看,这似乎有点工作,但与线性程序 Index.java 相比,它的运行速度真的很慢
public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
private HashSet<String> tokensSeen = new HashSet<String>();
@Override
public void map(LongWritable key, Text value, Context output) throws IOException {
FileSplit fileSplit = (FileSplit)output.getInputSplit();
System.out.println("FilePath " + fileSplit.getPath().toString());
String filePath = fileSplit.getPath().toString();
filePath = filePath.replace("unprocessed", "processed");
String[] words = value.toString().split("\\W+");
for (String word : words) {
Mutation mutation = new Mutation(genPartition(filePath.hashCode() % 10));
word = word.toLowerCase();
if(!tokensSeen.contains(word)) {
tokensSeen.add(word);
mutation.put(new Text(word), new Text(filePath), new Value(new byte[0]));
}
try {
output.write(null, mutation);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
缓慢的问题可能是我在一个测试实例上运行所有这些,这是一个带有 ZooKeeper 和 Accumulo 的单节点 Hadoop 实例。如果是这样,我只需要找到唯一性的解决方案。
非常感谢提供的任何帮助或建议。