0

你知道如何使用 MapReduce 范例来实现这个算法吗?

def getFriends(self, degree):
    friendList = []
    self._getFriends(degree, friendList)
    return friendList

def _getFriends(self, degree, friendList):
    friendList.append(self)
    if degree:
        for friend in self.friends:
            friend._getFriends(degree-1, friendList)

假设我们有以下双向友谊:

(1,2), (1,3), (1,4), (4,5), (4,6), (5,7), (5,8)

例如,如何获得用户 1 的 1 级、2 级和 3 级连接?答案必须是 1 -> 2, 3, 4, 5, 7, 8

谢谢

4

3 回答 3

0

据我了解,您想在社交图中收集某个人的第 n 个圈子中的所有朋友。大多数图算法都是递归的,递归不太适合 MapReduce 解决任务的方式。

我可以建议您使用Apache Giraph来解决这个问题(实际上它在后台使用 MapReduce)。它主要是异步的,您编写描述单个节点行为的作业,例如:

1. Send a message from root node to all friends to get their friendlist.
2.1. Each friend sends a message with friendlist to root node.
2.2. Each friend sends a message to all it's sub-friends to get their friendlist.
3.1. Each sub-friend sends a message with friendlist to root node.
3.2. Each sub-friend sends a message to all it's sub-sub-friends to get their friendlist.
...
N. Root node collects all these messages and merges them in a single list.

您也可以使用级联的 map-reduce 作业来收集圆圈,但这不是解决任务的非常有效的方法:

  1. 将 root 用户好友导出到文件circle-001
  2. circle-001用作作业的输入,将每个用户的朋友circle-001circle-002
  3. 做同样的事情,但circle-002用作输入
  4. ...
  5. 重复N次

如果您有很多用户来计算他们的圈子,第一种方法更适合。第二个启动多个 MR 作业的开销很大,但它要简单得多,并且对于少量输入用户来说是可以的。

于 2013-07-26T12:52:43.217 回答
0

我是这个领域的新手,但这是我的想法。

您可以按照以下伪代码使用传统的 BFS 算法。

在每次迭代中,您都会启动一个 Hadoop 作业,该作业会发现当前工作集的所有尚未访问的子节点。

BFS (list curNodes, list visited, int depth){
    if (depth <= 0){
        return visited;
    }

    //run Hadoop job on the current working set curNodes restricted by visited

    //the job will populate some result list with the list of child nodes of the current working set

    //then,

    visited.addAll(result);
    curNodes.empty();
    curNodes.addAll(result);

    BFS(curNodes, visited, depth-1);
}

这项工作的映射器和减速器将如下所示。

在这个例子中,我只是使用静态成员来保存工作集、访问集和结果集。

它应该使用临时文件来实现。可能有一些方法可以优化从一次迭代到下一次迭代累积的临时数据的持久性。

我用于该工作的输入文件包含倾倒列表,每行倾倒一个,例如 1,2 2,3 5,4 ... ...

  public static class VertexMapper extends
      Mapper<Object, Text, IntWritable, IntWritable> {

    private static Set<IntWritable> curVertex = null;
    private static IntWritable curLevel = null;
    private static Set<IntWritable> visited = null;

    private IntWritable key = new IntWritable();
    private IntWritable value = new IntWritable();

    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString(), ",");
      if (itr.countTokens() == 2) {
        String keyStr = itr.nextToken();
        String valueStr = itr.nextToken();
        try {
          this.key.set(Integer.parseInt(keyStr));
          this.value.set(Integer.parseInt(valueStr));

          if (VertexMapper.curVertex.contains(this.key)
              && !VertexMapper.visited.contains(this.value)
              && !key.equals(value)) {
            context.write(VertexMapper.curLevel, this.value);
          }
        } catch (NumberFormatException e) {
          System.err.println("Found key,value <" + keyStr + "," + valueStr
              + "> which cannot be parsed as int");
        }
      } else {
        System.err.println("Found malformed line: " + value.toString());
      }
    }
  }

  public static class UniqueReducer extends
      Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

    private static Set<IntWritable> result = new HashSet<IntWritable>();

    public void reduce(IntWritable key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {

      for (IntWritable val : values) {
        UniqueReducer.result.add(new IntWritable(val.get()));
      }
      // context.write(key, key);
    }
  }

运行工作将是这样的

UniqueReducer.result.clear();
VertexMapper.curLevel = new IntWritable(1);
VertexMapper.curVertex = new HashSet<IntWritable>(1);
VertexMapper.curVertex.add(new IntWritable(1));
VertexMapper.visited = new HashSet<IntWritable>(1);
VertexMapper.visited.add(new IntWritable(1));

Configuration conf = getConf();
Job job = new Job(conf, "BFS");
job.setJarByClass(BFSExample.class);
job.setMapperClass(VertexMapper.class);
job.setCombinerClass(UniqueReducer.class);
job.setReducerClass(UniqueReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(NullOutputFormat.class);
boolean result = job.waitForCompletion(true);

BFSExample bfs = new BFSExample();
ToolRunner.run(new Configuration(), bfs, args);
于 2013-07-27T19:54:25.863 回答
0

也许你可以使用支持类似sql查询的hive!

于 2013-07-26T02:43:22.340 回答