3

背景:

我正在尝试使用 MapReduce 在 Hadoop 上的 Java 中创建一个“文档项”矩阵。文档术语矩阵就像一个巨大的表格,其中每一行代表一个文档,每一列代表一个可能的单词/术语。

问题陈述:

假设我已经有一个术语索引列表(以便我知道哪个术语与哪个列号相关联),那么在每个文档中查找每个术语的索引的最佳方法是什么,以便我可以逐行构建矩阵-row(即,逐个文档)?

到目前为止,我可以想到两种方法:

方法#1:

将术语索引列表存储在 Hadoop 分布式文件系统上。每次映射器读取一个新文档以进行索引时,都会产生一个新的 MapReduce 作业——文档中每个唯一单词都有一个作业,其中每个作业都在分布式术语列表中查询其术语。这种方法听起来有点矫枉过正,因为我猜测启动新工作会产生一些开销,而且这种方法可能需要数千万个工作。另外,我不确定是否可以在另一个 MapReduce 作业中调用 MapReduce 作业。

方法#2:

将术语索引列表附加到每个文档,以便每个映射器最终得到术语索引列表的本地副本。这种方法在存储方面非常浪费(术语索引列表的副本与文档的数量一样多)。另外,我不确定如何将术语索引列表与每个文档合并——我会将它们合并到映射器还是减速器中?

问题更新 1

输入文件格式:

输入文件将是一个包含所有文档(产品评论)的 CSV(逗号分隔值)文件。文件中没有列标题,但每个评论的值按以下顺序显示:product_id、review_id、review、stars。下面是一个假的例子:

“产品 A”、“1”、“产品 A 非常非常昂贵。”、“2”</p>

“产品 G”、“2”、“很棒的产品!!”、“5”</p>

术语索引文件格式:

术语索引文件中的每一行包含以下内容:索引号、制表符和单词。每个可能的单词在索引文件中只列出一次,因此术语索引文件类似于 SQL 表的主键(单词)列表。对于特定文档中的每个单词,我的初步计划是遍历术语索引文件的每一行,直到找到该单词。然后将该单词的列号定义为与该单词关联的列/术语索引。下面是术语索引文件的示例,它是使用前面提到的两个示例产品评论构建的。

1 真棒

2 产品

3个

4 是

5 非常

6贵

输出文件格式:

我希望输出采用“矩阵市场”(MM)格式,这是压缩具有许多零的矩阵的行业标准。这是理想的格式,因为大多数评论只包含所有可能单词的一小部分,因此对于特定文档,只需要指定非零列。

MM 格式的第一行有三个制表符分隔值:文档总数、单词列总数和 MM 文件中不包括标题的总行数。在标题之后,每个附加行都包含与特定条目关联的矩阵坐标,以及条目的值,按以下顺序排列:reviewID、wordColumnID、条目(该词在评论中出现的次数)。有关 Matrix Market 格式的更多详细信息,请参阅此链接: http: //math.nist.gov/MatrixMarket/formats.html

每个评论的 ID 将等于其在文档术语矩阵中的行索引。这样我就可以在 Matrix Market 格式中保留评论的 ID,这样我仍然可以将每条评论与其星级相关联。我的最终目标——这超出了这个问题的范围——是建立一个自然语言处理算法来根据其文本预测新评论中的星数。

使用上面的示例,最终输出文件将如下所示(我无法让 Stackoverflow 显示制表符而不是空格):

2 6 7

1 2 1

1 3 1

1 4 1

1 5 2

1 6 1

2 1 1

2 2 1

4

3 回答 3

2

好吧,您可以使用类似于倒排索引概念的东西。

我建议这样做是因为,我假设这两个文件都很大。因此,像一对一一样相互比较将是真正的性能瓶颈。

这是一种可以使用的方法-

您可以将输入文件格式 csv 文件(例如 datafile1、datafile2)和术语索引文件(例如 term_index_file)作为作业的输入。

然后在每个映射器中,过滤源文件名,如下所示 -

映射器的伪代码 -

map(key, row, context){
  String filename= ((FileSplit)context.getInputSplit()).getPath().getName();
   if (filename.startsWith("datafile") {

     //split the review_id, words from row
     ....
     context.write(new Text("word), new Text("-1 | review_id"));

   } else if(filename.startsWith("term_index_file") {
     //split index and word
     ....
     context.write(new Text("word"), new Text("index | 0"));
   }

}

例如来自不同映射器的输出

Key       Value      source
product   -1|1       datafile
very       5|0       term_index_file
very      -1|1       datafile
product   -1|2       datafile
very      -1|1       datafile
product    2|0       term_index_file
...
...

说明(示例): 因为它清楚地表明键将是您的单词,而值将由分隔符“|”分隔的两部分组成

如果源是数据文件,则发出 key=product 和 value=-1|1,其中 -1 是虚拟元素,1 是 review_id。

如果源是 term_index_file,那么你发出 key=product 和 value=2|0,其中 2 是单词“product”的索引, 0 是虚拟的 review_id,我们将使用它来进行排序——稍后解释。

当然,如果我们将 term_index_file 作为正常的输入文件提供给作业,两个不同的映射器将不会处理重复的索引。因此, term_index_file 中的“产品、变化”或任何其他索引词将仅对一个映射器可用。请注意,这仅对 term_index_file 而不是数据文件有效。

下一步:

Hadoop mapreduce 框架,你可能很清楚,会按键分组所以,你会有这样的东西去不同的减速器,

reduce-1: key=product, value=<-1|1, -1|2, 2|0>
reduce-2: key=very, value=<5|0, -1|1, -1|1>

但是,在上述情况下,我们遇到了问题。我们希望对 '|' 之后的值进行排序 即在reduce-1 -> 2|0, -1|1, -1|2 and in reduce-2 -> <5|0, -1|1, -1|1>

为此,您可以使用使用排序比较器实现的辅助排序。请为此谷歌,但这里有一个可能有帮助的链接。在这里提到它可能会很长。

在每个 reduce-1 中,由于值按上述方式排序,因此当我们开始迭代时,我们将在第一次迭代中获得'0'以及index_id=2,然后可以将其用于后续迭代。在接下来的两次迭代中,我们连续获得评论 ID 1 和 2,并且我们使用计数器,以便我们可以跟踪任何重复的评论 ID。当我们得到重复的评论 id 时,这意味着一个词在同一个 review_id 行中出现了两次。仅当我们找到不同的 review_id 并发出特定 index_id 的先前 review_id 详细信息时,我们才会重置计数器,如下所示 -

previous_review_id  + "\t" + index_id + "\t" + count

当循环结束时,我们将留下一个 previous_review_id,我们最终以相同的方式发出它。

减速器的伪代码 -

reduce(key, Iterable values, context) {
  String index_id = null;
  count = 1;
  String previousReview_id = null;
  for(value: values) {
      Split split[] = values.split("\\|");
      ....

      //when consecutive review_ids are same, we increment count
      //and as soon as the review_id differ, we emit, reset the counter and print
      //the previous review_id detected.
      if (split[0].equals("-1") && split[1].equals(previousReview_id)) {
          count++;
      } else if(split[0].equals("-1") && !split[1].equals(prevValue)) {
          context.write(previousReview_id + "\t" + index_id + "\t" + count);
          previousReview_id = split[1];//resting with new review_id id
          count=1;//resetting count for new review_id
      } else {
         index_id = split[0]; 
      }
  }
  //the last  previousReview_id will be left out, 
  //so, writing it now after the loop  completion
  context.write(previousReview_id + "\t" + index_id + "\t" + count);

}

这项工作是使用多个 reducer 完成的,以便利用 Hadoop 以实现其最出名的性能 - 结果,最终输出将分散,如下所示,偏离您想要的输出。

1 4 1
2 1 1
1 5 2
1 2 1
1 3 1
1 6 1
2 2 1

但是,如果您希望根据 review_id 对所有内容进行排序(作为您想要的输出),您可以再编写一个作业来为您使用单个减速器和 previos 作业的输出作为输入。同时也计算2 6 7并将其放在输出的前面。

我认为这只是一种方法(或想法),可能会对您有所帮助。您肯定想修改它,使用更好的算法并以您认为对您有利的方式使用它。

您还可以使用复合键来获得比使用分隔符(例如“|”)更好的清晰度。

我愿意澄清。请询问您是否认为,它可能对您有用。

谢谢!

于 2013-08-18T22:43:04.610 回答
1

您可以在 Hadoop 分布式缓存中加载术语索引列表,以便映射器和减速器可以使用它。例如,在 Hadoop 流中,您可以按如下方式运行您的作业:

$ hadoop  jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar \
  -input myInputDirs \
  -output myOutputDir \
  -mapper myMapper.py \
  -reducer myReducer.py \
  -file myMapper.py \
  -file myReducer.py \
  -file myTermIndexList.txt

现在在 myMapper.py 中,您可以加载文件 myTermIndexList.txt 并将其用于您的目的。如果您对输入和所需输出进行更详细的描述,我可以为您提供更多详细信息。

于 2013-08-18T18:40:23.060 回答
0

方法 #1 不好,但如果您没有太多 hadoop 经验,则非常常见。开始工作非常昂贵。您要做的是拥有 2-3 个工作,这些工作相互补充以获得所需的结果。类似问题的一个常见解决方案是让映射器标记输入和输出对,将它们分组在减速器中执行某种计算,然后将其输入作业 2。在作业 2 的映射器中,您以某种方式反转数据并在减速器中做一些其他的计算。

我强烈建议通过培训课程了解更多关于 Hadoop 的信息。有趣的是,Cloudera 的开发课程与您要解决的问题非常相似。或者,除了一门课程之外,我还会查看“使用 MapReduce 进行数据密集型文本处理”,特别是“计算相对频率”和“文本检索的反向索引”部分

http://lintool.github.io/MapReduceAlgorithms/MapReduce-book-final.pdf

于 2013-08-18T22:20:44.967 回答