我正在尝试使用 Python 中的 mrjob 框架编写一个 MapReduce 程序来计算Trigram 。到目前为止,这就是我所拥有的:
from mrjob.job import MRJob
class MRTrigram(MRJob):
def mapper(self, _, line):
w = line.split()
for idx,word in enumerate(w):
if idx < len(w) - 2:
# Generate a trigram using the current word and next 2 words
trigram = w[idx] + " " + w[idx + 1] + " " + w[idx + 2]
yield trigram, 1
def reducer(self, key, values):
yield sum(values), key
# ignore this part - its just standard bolierplate for mrjob!
if __name__ == '__main__':
MRTrigram.run()
可以看出,我没有处理三元组跨行拆分的情况(例如,第 3 行末尾的“它是”,第 4 行开头的“最好的时代” - 但我的代码会在这种情况下,不要捕获三元组“它是”!)。
如何跨多个映射调用保留状态,确保无论映射器由底层运行时分配作业,只计算连续行的三元组?我曾想过将每行的最后 2 个单词存储在 MRTrigram 类中的持久数据结构中,但后来我意识到我无法保证我是否在比较第 i 行和第 i+1 行(而不是第 i、j 行,其中j 可以在文档中的任何位置行!)。
有什么想法可以让我走上正轨吗?