7

我正在尝试学习将 Yelp 的 Python API 用于 MapReduce、MRJob。他们简单的单词计数器示例很有意义,但我很好奇如何处理涉及多个输入的应用程序。例如,不是简单地计算文档中的单词,而是将向量乘以矩阵。我想出了这个解决方案,它的功能,但感觉很傻:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

运行此代码./matrix.py < input.txt,它起作用的原因是矩阵按列存储在 input.txt 中,相应的向量值位于行尾。

因此,以下矩阵和向量:

在此处输入图像描述

表示为 input.txt 为:

在此处输入图像描述

简而言之,我将如何更自然地将矩阵和向量存储在单独的文件中并将它们都传递给 MRJob?

4

5 回答 5

3

如果您需要针对另一个(或相同的 row_i、row_j)数据集处理原始数据,您可以:

1) 创建一个 S3 存储桶来存储数据的副本。将此副本的位置传递给您的任务类,例如下面代码中的 self.options.bucket 和 self.options.my_datafile_copy_location。警告:不幸的是,似乎整个文件必须在处理之前“下载”到任务机器。如果连接失败或加载时间过长,此作业可能会失败。这是一些执行此操作的 Python/MRJob 代码。

把它放在你的映射器函数中:

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2) 创建一个 SimpleDB 域,并将所有数据存储在其中。在这里阅读 boto 和 SimpleDB: http ://code.google.com/p/boto/wiki/SimpleDbIntro

您的映射器代码如下所示:

dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

如果您有大量数据,则第二个选项可能会执行得更好,因为它可以对每一行数据发出请求,而不是一次请求全部数据。请记住,SimpleDB 值最多只能是 1024 个字符长,因此如果您的数据值长于该值,您可能需要通过某种方法进行压缩/解压缩。

于 2012-06-12T20:01:32.013 回答
2

这就是我使用多个输入并基于文件名在映射器阶段进行适当更改的方式。

跑步者计划:

from mrjob.hadoop import *


#Define all arguments

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/'
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
hadoop_bin = '/usr/bin/hadoop'
mode = 'hadoop'
hs = HadoopFilesystem([hadoop_bin])

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"]

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin]
aargs.extend(input_file_names)
aargs.extend(['-o',output_dir])
print aargs
status_file = True

mr_job = MRJob(args=aargs)
with mr_job.make_runner() as runner:
    runner.run()
os.environ['HADOOP_HOME'] = ''
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))

MRJob 类:

class MR_Job(MRJob):
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
    def mapper(self, _, line):
    """
    This function reads lines from file.
    """
    try:
        #Need to clean email.
        input_file_name = get_jobconf_value('map.input.file').split('/')[-2]
                """
                Mapper code
                """
    except Exception, e:
        print e

    def reducer(self, email_id,visitor_id__date_time):
    try:
        """
                Reducer Code
                """
    except:
        pass


if __name__ == '__main__':
    MRV_Email.run()
于 2014-12-24T20:54:31.987 回答
2

您的问题的实际答案是 mrjob 还不完全支持 hadoop 流连接模式,即读取 map_input_file 环境变量(它公开 map.input.file 属性)以确定您正在处理的文件类型在其路径和/或名称上。

如果您可以轻松地从读取数据本身中检测到它属于哪种类型,那么您可能仍然可以完成它,如本文所示:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

然而,这并不总是可能的......

否则 myjob 看起来很棒,我希望他们将来可以为此添加支持。在那之前,这对我来说几乎是一个交易破坏者。

于 2013-09-13T17:59:51.190 回答
1

在我的理解中,除非您想利用来自 Amazon 的 Hadoop 集群或 Hadoop 服务,否则即使示例利用在本地文件上运行,您也不会使用 MrJob。

MrJob 主要使用“ Hadoop 流”来提交作业。

这意味着所有从 Hadoop 指定为文件或文件夹的输入都被流式传输到映射器,随后的结果流到化简器。所有映射器都获得一个输入切片,并认为所有输入在示意上是相同的,以便它统一解析和处理每个数据切片的键、值。

根据这种理解,映射器的输入在示意图上是相同的。唯一可能包含两个不同原理图数据的方法是将它们交错在同一个文件中,以便映射器可以理解哪些是矢量数据,哪些是矩阵数据。

You are actually doing it already.

如果一行是矩阵数据或矢量数据,您可以通过使用一些说明符来简单地改进它。一旦你看到一个向量数据,那么前面的矩阵数据就会被应用到它上面。

matrix, 1, 2, ...
matrix, 2, 4, ...
vector, 3, 4, ...
matrix, 1, 2, ...
.....

但是您提到的过程运行良好。您必须将所有原理图数据放在一个文件中。

但这仍然存在问题。当完整的模式存在于单行中并且包含完整的单个处理单元时,K,V map reduce 效果更好。

根据我的理解,您已经正确地做到了,但我猜 Map-Reduce 不适合这种数据。我希望有人能比我更进一步地澄清这一点。

于 2012-06-12T20:39:37.657 回答
1

MrJob基础状态:

您可以传递多个输入文件,与标准输入混合(使用 - 字符):

$ python my_job.py input1.txt input2.txt - < input3.txt
于 2020-04-15T12:03:17.923 回答