我正在做一个 Hadoop 项目。我的减少阶段是非常昂贵的内存。我目前正在使用 HashMap,但我得到了Error: Java Heap space
因为在 Reduce 中我构建了一个巨大的 hashmap(数据集为 32GB)。解决方案可能是具有磁盘回退的内存中 Hashmap,而 MapDB 似乎符合我的需求。但我不确定用法。diskMap 对于每个 Reduce 任务都是唯一的,inMemory 映射对于每个 reduce 'key' 都是唯一的。即使我设置expireMaxSize(3)
为测试,我也不确定何时使用 onDisk 映射以及逻辑是否正确。同样,为了测试,我用 20 个假条目填充了 hashmap。基本上,为了避免堆溢出,我需要控制 inMemory map 的增长。
public class TestReducer extends Reducer<LongWritable, BytesWritable, String, IntWritable> {
private int id;
DB dbDisk;
protected void setup(Context context) throws IOException, InterruptedException {
id = context.getTaskAttemptID().getTaskID().getId();
File diskmap = new File("tmp/diskmap"+id);
diskmap.delete();
dbDisk = DBMaker
.fileDB("tmp/diskmap"+id)
.make();
}
@Override
protected void reduce(LongWritable key, Iterable<BytesWritable> values, Context context)
throws IOException, InterruptedException {
DB dbMemory = DBMaker
.memoryDB()
.make();
HTreeMap<Long,Integer> onDisk = dbDisk
.hashMap("onDisk")
.keySerializer(Serializer.LONG)
.valueSerializer(Serializer.INTEGER)
.createOrOpen();
// fast in-memory collection with limited size
HTreeMap<Long,Integer> inMemory = dbMemory
.hashMap("inMemory")
.expireMaxSize(3)
.keySerializer(Serializer.LONG)
.valueSerializer(Serializer.INTEGER)
//this registers overflow to `onDisk`
.expireOverflow(onDisk)
.createOrOpen();
for(int k=0;k<20;k++){
inMemory.put((long)k,k*2);
}
Set set = inMemory.entrySet();
Iterator it = set.iterator();
while(it.hasNext()) {
Map.Entry<Long,Integer> entry = (Map.Entry<Long,Integer>)it.next();
System.out.print("Key is: "+entry.getKey() + " & ");
System.out.println("Value is: "+entry.getValue());
}
}
protected void cleanup(Context context) throws IOException,InterruptedException {
dbDisk.close();
}
}