0

我正在尝试在亚马逊的 EMR 上运行 mrjob。我已经使用内联运行器在本地测试了该作业,但在 Amazon 上运行时它失败了。我已将失败缩小到我对外部数据文件的依赖zip_codes.txt。如果我使用硬编码的邮政编码数据在没有这种依赖关系的情况下运行,它就可以正常工作。

我尝试使用上传文件参数包含必要的数据文件。当我查看 S3 时,该文件确实存在,但显然出现了问题,因此我无法在本地访问它。

在此处输入图像描述

这是我的mrjob.conf文件:

runners:
  emr:
    aws_access_key_id: FOOBARBAZQUX
    aws_secret_access_key: IAMASECRETKEY
    aws_region: us-east-1
    ec2_key_pair: mapreduce
    ec2_key_pair_file: $ENV/keys/mapreduce.pem
    ssh_tunnel_to_job_tracker: true
    ssh_tunnel_is_open: true
    cleanup_on_failure: ALL
    cmdenv:
      TZ: America/Los_Angeles 

这是我的MR_zip.py文件。

from mrjob.job import MRJob
import mrjob
import csv

def distance(p1, p2):
    # d = ...    
    return d

class MR_zip(MRJob):
    OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol
    zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}

    def mapper(self, _, line):
        zip_code_1, poi = line.split(",")
        zip_code_1 = int(zip_code_1)
        lat1, lon1 = self.zip_codes[zip_code_1]
        for zip_code_2, (lat2, lon2) in self.zip_codes.items():
            d = distance((lat1, lon1), (lat2, lon2))
            yield zip_code_2, (zip_code_1, poi, d)

    def reducer(self, zip_code_1, ds):
        result = {}
        for zip_code_2, poi, d in ds:
            if poi not in result:
                result[poi] = (zip_code_2, d)
            elif result[poi][1] > d:
                result[poi] = (zip_code_2, d)
        yield zip_code_1, result

if __name__ == '__main__':
    MR_zip.run()

最后,我使用以下命令运行它:

python MR_zip.py -r emr --conf mrjob.conf --file zip_codes.txt < poi.txt

其中 zip_codes.txt 看起来像:

...
62323,39.817702,-90.66923
62324,39.988988,-90.94976
62325,40.034398,-91.16278
62326,40.421857,-90.80333
...

poi.txt 看起来像:

...
210,skate park
501,theatre
29001,theatre
8001,knitting club
20101,food bank
...
4

2 回答 2

3

此外,您可能会发现有用MRJob.add_file_option的例程。例如,指定

self.add_file_option('--config-file', dest='config_file', 
    default=None, help='file with labels', action="append")

self.options.config_file您可以通过路径列表引用上传的文件。

于 2013-11-01T13:42:49.410 回答
1

概述

我的代码中有两个错误:

  1. 步骤的初始化代码应该在步骤的初始化程序中
  2. 默认情况下,EMR 使用 Python 2.6,它排除了字典理解等

步骤初始化

每一步都有对应的初始化方法。例如,mapperhasmapper_init可用于初始化映射器中使用的数据。函数reducercombiner初始化方法相似。如果您使用该steps函数来定义自己的步骤,那么您还可以定义您使用的初始化函数。在此处阅读有关初始化程序的更多信息。

当心 Python 版本

截至今天,EMR 默认使用 Python 2.6.6 版本。因此,任何对更高版本的依赖都可能在本地运行,但在 EMR 上会出现问题。

修复

要修复上面的代码,需要删除定义zip_codesMR_zip.py

zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}

而是在mapper_init不使用字典推导的情况下定义它。

def mapper_init(self):
    self.zip_codes = {}
    for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r")):
        self.zip_codes[int(zip_code)] = (float(latitude), float(longitude))

其他文件和命令行保持不变。

于 2013-09-25T08:00:18.213 回答