我是这个领域的新手,但这是我的想法。
您可以按照以下伪代码使用传统的 BFS 算法。
在每次迭代中,您都会启动一个 Hadoop 作业,该作业会发现当前工作集的所有尚未访问的子节点。
BFS (list curNodes, list visited, int depth){
if (depth <= 0){
return visited;
}
//run Hadoop job on the current working set curNodes restricted by visited
//the job will populate some result list with the list of child nodes of the current working set
//then,
visited.addAll(result);
curNodes.empty();
curNodes.addAll(result);
BFS(curNodes, visited, depth-1);
}
这项工作的映射器和减速器将如下所示。
在这个例子中,我只是使用静态成员来保存工作集、访问集和结果集。
它应该使用临时文件来实现。可能有一些方法可以优化从一次迭代到下一次迭代累积的临时数据的持久性。
我用于该工作的输入文件包含倾倒列表,每行倾倒一个,例如 1,2 2,3 5,4 ... ...
public static class VertexMapper extends
Mapper<Object, Text, IntWritable, IntWritable> {
private static Set<IntWritable> curVertex = null;
private static IntWritable curLevel = null;
private static Set<IntWritable> visited = null;
private IntWritable key = new IntWritable();
private IntWritable value = new IntWritable();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(), ",");
if (itr.countTokens() == 2) {
String keyStr = itr.nextToken();
String valueStr = itr.nextToken();
try {
this.key.set(Integer.parseInt(keyStr));
this.value.set(Integer.parseInt(valueStr));
if (VertexMapper.curVertex.contains(this.key)
&& !VertexMapper.visited.contains(this.value)
&& !key.equals(value)) {
context.write(VertexMapper.curLevel, this.value);
}
} catch (NumberFormatException e) {
System.err.println("Found key,value <" + keyStr + "," + valueStr
+ "> which cannot be parsed as int");
}
} else {
System.err.println("Found malformed line: " + value.toString());
}
}
}
public static class UniqueReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private static Set<IntWritable> result = new HashSet<IntWritable>();
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
for (IntWritable val : values) {
UniqueReducer.result.add(new IntWritable(val.get()));
}
// context.write(key, key);
}
}
运行工作将是这样的
UniqueReducer.result.clear();
VertexMapper.curLevel = new IntWritable(1);
VertexMapper.curVertex = new HashSet<IntWritable>(1);
VertexMapper.curVertex.add(new IntWritable(1));
VertexMapper.visited = new HashSet<IntWritable>(1);
VertexMapper.visited.add(new IntWritable(1));
Configuration conf = getConf();
Job job = new Job(conf, "BFS");
job.setJarByClass(BFSExample.class);
job.setMapperClass(VertexMapper.class);
job.setCombinerClass(UniqueReducer.class);
job.setReducerClass(UniqueReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(NullOutputFormat.class);
boolean result = job.waitForCompletion(true);
BFSExample bfs = new BFSExample();
ToolRunner.run(new Configuration(), bfs, args);