好吧,您可以使用类似于倒排索引概念的东西。
我建议这样做是因为,我假设这两个文件都很大。因此,像一对一一样相互比较将是真正的性能瓶颈。
这是一种可以使用的方法-
您可以将输入文件格式 csv 文件(例如 datafile1、datafile2)和术语索引文件(例如 term_index_file)作为作业的输入。
然后在每个映射器中,过滤源文件名,如下所示 -
映射器的伪代码 -
map(key, row, context){
String filename= ((FileSplit)context.getInputSplit()).getPath().getName();
if (filename.startsWith("datafile") {
//split the review_id, words from row
....
context.write(new Text("word), new Text("-1 | review_id"));
} else if(filename.startsWith("term_index_file") {
//split index and word
....
context.write(new Text("word"), new Text("index | 0"));
}
}
例如来自不同映射器的输出
Key Value source
product -1|1 datafile
very 5|0 term_index_file
very -1|1 datafile
product -1|2 datafile
very -1|1 datafile
product 2|0 term_index_file
...
...
说明(示例):
因为它清楚地表明键将是您的单词,而值将由分隔符“|”分隔的两部分组成
如果源是数据文件,则发出 key=product 和 value=-1|1,其中 -1 是虚拟元素,1 是 review_id。
如果源是 term_index_file,那么你发出 key=product 和 value=2|0,其中 2 是单词“product”的索引, 0 是虚拟的 review_id,我们将使用它来进行排序——稍后解释。
当然,如果我们将 term_index_file 作为正常的输入文件提供给作业,两个不同的映射器将不会处理重复的索引。因此, term_index_file 中的“产品、变化”或任何其他索引词将仅对一个映射器可用。请注意,这仅对 term_index_file 而不是数据文件有效。
下一步:
Hadoop mapreduce 框架,你可能很清楚,会按键分组所以,你会有这样的东西去不同的减速器,
reduce-1: key=product, value=<-1|1, -1|2, 2|0>
reduce-2: key=very, value=<5|0, -1|1, -1|1>
但是,在上述情况下,我们遇到了问题。我们希望对 '|' 之后的值进行排序 即在reduce-1 -> 2|0, -1|1, -1|2 and in reduce-2 -> <5|0, -1|1, -1|1>
为此,您可以使用使用排序比较器实现的辅助排序。请为此谷歌,但这里有一个可能有帮助的链接。在这里提到它可能会很长。
在每个 reduce-1 中,由于值按上述方式排序,因此当我们开始迭代时,我们将在第一次迭代中获得'0'以及index_id=2,然后可以将其用于后续迭代。在接下来的两次迭代中,我们连续获得评论 ID 1 和 2,并且我们使用计数器,以便我们可以跟踪任何重复的评论 ID。当我们得到重复的评论 id 时,这意味着一个词在同一个 review_id 行中出现了两次。仅当我们找到不同的 review_id 并发出特定 index_id 的先前 review_id 详细信息时,我们才会重置计数器,如下所示 -
previous_review_id + "\t" + index_id + "\t" + count
当循环结束时,我们将留下一个 previous_review_id,我们最终以相同的方式发出它。
减速器的伪代码 -
reduce(key, Iterable values, context) {
String index_id = null;
count = 1;
String previousReview_id = null;
for(value: values) {
Split split[] = values.split("\\|");
....
//when consecutive review_ids are same, we increment count
//and as soon as the review_id differ, we emit, reset the counter and print
//the previous review_id detected.
if (split[0].equals("-1") && split[1].equals(previousReview_id)) {
count++;
} else if(split[0].equals("-1") && !split[1].equals(prevValue)) {
context.write(previousReview_id + "\t" + index_id + "\t" + count);
previousReview_id = split[1];//resting with new review_id id
count=1;//resetting count for new review_id
} else {
index_id = split[0];
}
}
//the last previousReview_id will be left out,
//so, writing it now after the loop completion
context.write(previousReview_id + "\t" + index_id + "\t" + count);
}
这项工作是使用多个 reducer 完成的,以便利用 Hadoop 以实现其最出名的性能 - 结果,最终输出将分散,如下所示,偏离您想要的输出。
1 4 1
2 1 1
1 5 2
1 2 1
1 3 1
1 6 1
2 2 1
但是,如果您希望根据 review_id 对所有内容进行排序(作为您想要的输出),您可以再编写一个作业来为您使用单个减速器和 previos 作业的输出作为输入。同时也计算2 6 7并将其放在输出的前面。
我认为这只是一种方法(或想法),可能会对您有所帮助。您肯定想修改它,使用更好的算法并以您认为对您有利的方式使用它。
您还可以使用复合键来获得比使用分隔符(例如“|”)更好的清晰度。
我愿意澄清。请询问您是否认为,它可能对您有用。
谢谢!