0

我想使用 EMR 将文本文件的内容拆分为 2 个不同的文件。输入文件以及 mapper 和 reducer 脚本都存储在 AWS 的 S3 中。目前,我的映射器通过制表符分隔整个文件中的每个字段来重新格式化标准输入的输入。

import sys
import time

first_line = True

for line in sys.stdin:
    if first_line == True:
            first_line = False
            continue
    line= line.strip()
    data=line.split('|')
    d = data[0]
    for i in range(1,len(data)):
            d = d + '\t' +str(data[i])
    d = d+ '\n'
    print d

我的减速器是魔法发生的地方。我希望减速器根据特定字段的值将此文本文件拆分为 2 个不同的文件。这是我当前的 reducer.py 代码

mobile_inquiries = open("reducer_output/mob_inq.txt", "a")
transactions = open("reducer_output/transactions.txt", "a")
mob_merchant_id='"99031479997"'
mob_response_code = '"0"'
mob_request_codes = ['"400"','"401"','"402"','"403"','"450"','"408"','"2400"','"2401"','"2402"','"2408"','"6400"','"6405"','"6450"']

for line in sys.stdin:          
    line= line.strip()
    data=line.split('\t')
    d = data[0]
    merchant_id = data[4]
    request_code = data[10]
    response_code = data[19]

# Writes to mobile inquiry file
    if (merchant_id == mob_merchant_id) and (response_code == mob_response_code) and (request_code in mob_request_codes):
        d = d + '\t' +str(data[9])+ '\t' + str(data[28])+'\n'               
        mobile_inquiries.write(d)
# Writes to transaction file
    else:
        d = d + '\t' +str(data[9])+ '\t' + str(data[6])+ '\t' + str(data[4])+ '\t' + str(data[26])+ '\t' + str(data[10])+ '\t' + str(data[19])+ '\t' + str(data[28])+ '\n'
        transactions.write(d)
mobile_inquiries.close()
transactions.close()

此 EMR 作业失败并返回以下错误消息:由于步骤失败而关闭。我已经在每一行上使用 fileReaders 在本地测试了这两个脚本,并且它可以工作。将任务导入 EMR 会导致问题。我的问题是: - 是否可以使用 EMR 将文件拆分为 2 个或更多文件?- 如果是这样,S3 是否会阻止我动态创建新文件,从而导致 EMR 作业失败?- 还是我的代码行为错误?

我感谢任何和所有的反馈。

谢谢你。

4

1 回答 1

0

您尝试执行此操作的方式行不通。即使作业成功 - 您也只是设法将文件写入 Hadoop 集群中每个节点上的本地文件系统。最有可能 - 一旦作业完成,这些文件将被丢弃。

奇怪的是,即使映射器发出一个 key\tvalue 结构,reducer 似乎也没有对给定键的值集合做任何事情。所以不清楚为什么还要费心按数据[0]分割地图输出?(也许我不明白上下文)

如果可能的话 - 这些将是更好的选择:

  • 首先使用仅地图作业将输入数据拆分为两个数据集(mobile_inquiries 和 transactions)。如果您愿意使用 Hive - 您可以选择一个表并根据谓词插入两个目录(在 HDFS 或 S3 中)(就像 Python 代码中的那个)
  • 现在输入已被拆分 - 在每个输出上运行一个 map-reduce 作业。这可以执行任何 map/reduce 功能。FWIW - 这里编码的 map-reduce 函数实际上并不需要 Python - 可以直接用标准 Hive SQL 表示。
于 2013-04-24T19:43:28.027 回答