0

我正在尝试使用 Python 中的 mrjob 框架编写一个 MapReduce 程序来计算Trigram 。到目前为止,这就是我所拥有的:

from mrjob.job import MRJob

class MRTrigram(MRJob):

    def mapper(self, _, line):
        w = line.split()
        for idx,word in enumerate(w):
            if idx < len(w) - 2:
                # Generate a trigram using the current word and next 2 words
                trigram = w[idx] + " " + w[idx + 1] + " " + w[idx + 2]
                yield trigram, 1

    def reducer(self, key, values):
        yield sum(values), key

# ignore this part - its just standard bolierplate for mrjob!
if __name__ == '__main__':
    MRTrigram.run()

可以看出,我没有处理三元组跨行拆分的情况(例如,第 3 行末尾的“它是”,第 4 行开头的“最好的时代” - 但我的代码会在这种情况下,不要捕获三元组“它是”!)。

如何跨多个映射调用保留状态,确保无论映射器由底层运行时分配作业,只计算连续行的三元组?我曾想过将每行的最后 2 个单词存储在 MRTrigram 类中的持久数据结构中,但后来我意识到我无法保证我是否在比较第 i 行和第 i+1 行(而不是第 i、j 行,其中j 可以在文档中的任何位置行!)。

有什么想法可以让我走上正轨吗?

4

1 回答 1

0

您可能会通过编写自定义协议来获得有关如何完成此操作的提示,但我相信 mrjob 在您可以添加自定义行为(即形成键和值)之前采用由换行符分隔的流输入,所以它可能mrjob 不可能。

如果您使用的是 Hadoop(即本机 Java),那么您可以编写一个自定义输入格式,该格式采用多行文本并从中解析出一个键值对。

于 2014-04-17T17:09:04.527 回答