2

我开始使用 python 的mrjob将我的一些长期运行的 python 程序转换为 MapReduce hadoop 作业。我已经得到了简单的字数统计示例,并且我在概念上理解了“文本分类”示例。

但是,我在确定解决问题所需的步骤时遇到了一些麻烦。

我有多个文件(大约 6000 个),每个文件有 2 到 800 行。在这种情况下,每一行都是一个简单的以空格分隔的“信号”。我需要比较每个文件中的每一行与所有文件(包括其自身)中的每一行之间的相关性。然后根据相关系数输出结果。

一个文件的示例:

1 2 3 4 2 3 1 2 3 4 1 2
2 2 3 1 3 3 1 2 3 1 4 1
2 3 4 5 3 2 1 3 4 5 2 1
...

我需要将此文件的每一行与其他每个文件中的每一行配对生成......或者如果这会使事情变得更容易,我可以将所有文件连接到一个文件中,但我仍然需要成对迭代。

我了解如何进行计算以及如何使用最终的 reduce 步骤来聚合和过滤结果。我遇到的困难是如何在yield不读取单个 setp 中的所有文件的情况下将所有成对的项目添加到连续的步骤中?我想我可以提前准备一个输入文件,itertools.product但这个文件会非常大。

4

1 回答 1

1

好吧,既然没有人想出答案,我会发布我目前的解决方法,以防其他人需要它。我不确定这是多么“规范”或高效,但它到目前为止有效。

我将文件名作为文件每一行的第一项,然后是 a,\t然后是其余数据。对于这个例子,我只是在每一行上使用一个数字,然后对它们进行平均,就像一个非常简单的例子。

然后我在mrjob.

class MRAvgPairwiseLines(MRJob):

def input_mapper(self, _, value):
    """Takes each input line and converts it to (fnum, num) and a key of 'ALL'"""

    fnum, val = value.split('\t')
    yield 'ALL', (fnum, val)

def input_reducer(self, key, values):

    for (fnum1, val1), (fnum2, val2) in product(values, repeat = 2):
        yield fnum1, (fnum1, fnum2, val1, val2)

def do_avg(self, key, value):

    fnum1, fnum2, val1, val2 = value
    res = (float(val1)+float(val2))/float(2)
    yield key, (fnum2, res)

def get_max_avg(self, key, values):

    max_fnum, max_avg = max(values, key = lambda x: x[1])
    yield key, (max_fnum, max_avg)

def steps(self):
    return [self.mr(mapper=self.input_mapper, reducer=self.input_reducer),
                self.mr(mapper=self.do_avg, reducer=self.get_max_avg)]

这样,input_mapper函数的所有输出都被分组为相同的input_reducer,然后yield是连续的对。然后将它们传递到适当的位置,最终返回最大的平均值(这实际上是所有其他文件中最大的项目)。

希望对某人有所帮助。

于 2011-07-10T23:44:24.157 回答