2

我在 AWS 的弹性地图减少集群中运行了一项大型工作。总的来说,我的意思是我正在处理超过 800,000 个文件,每个文件有 25,000 多条记录。在我的测试运行中,我一直在使用 100 个 m1.medium 现货实例进行处理。

这项工作似乎运行正常,但是我注意到输出(part-00000、part-00001 等)在多个输出中列出了具有相同键的记录。这些不应该在 EMR 中减少吗?

任何见解将不胜感激。

4

1 回答 1

1

我遇到了同样的问题 - 我正在使用 EMR 创建使用流 API 的“倒排索引”:

-input s3n://mybucket/html2 -output s3n://mybucket/results -mapper s3n://mybucket/mapper.py -reducer s3n://mybucket/reduce.py

//mybucket/html2 有几个 html 文件和

映射器.py:

def main(args):
    for line in sys.stdin:
        line = line.strip()
        words = line.split()            
        for word in words:
            #do some preprocessing
            if word.startswith("http://"):
                #output the URL with a count of 1
                print "%s,%s" % (word, 1)
            else:           
                #cleanup HTML tags

                url = get_url() #irrelevant
                print "%s,%s" % (word, url)

if __name__ == "__main__":
    main(sys.argv)

reduce.py 是:

def main(args):
    current_word = None
    current_count = 0
    current_url_list = []
    key = None

    for line in sys.stdin:
        line = line.strip()
        (key, val) = line.split(',', 1)

        # If key is a URL - act as word count reducer
        if key.startswith("http:"):
            # convert count (currently a string) to int
            try:
                count = int(val)
            except:
                # count was not a number, so silently
                # ignore/discard this line
                continue

            # this IF-switch only works because Hadoop sorts map output
            # by key (here: word) before it is passed to the reducer
            if current_word == key:
                current_count += count
            else:
                if current_word:
                     #Check if previous word was a regular word
                    if current_word.startswith('http:'):
                        print '%s,%s' % (current_word, current_count)
                    else:
                        # previous word was a regular word
                        print '%s,%s' % (current_word, ','.join(current_url_list))
                current_count = count
                current_word = key
        else:
            #If key is a word - as act a URL-list-appending reducer
            if current_word == key:
                if val not in current_url_list:
                    current_url_list.append(val)
            else: #Got to a new key
                if current_word:
                    #Check if previous word was a URL
                    if(current_word.startswith("http:")):
                        print '%s,%s' % (current_word, current_count)
                    else:
                        # previous word was a regular word
                        print '%s,%s' % (current_word, ','.join(current_url_list))
                current_url_list = []
                current_url_list.append(val)
                current_word = key

我正在使用 AWS 控制台向导(“创建新作业流程”)启动此流程,除了设置输入、输出、映射和缩减脚本之外,我将所有内容都保留为默认值(日志路径除外)。

在输出中,我得到了几个文件,并且在其中我看到了相同的键(每次都有不同的值)。

也许这可以帮助更多地了解这个问题并帮助解决它

于 2013-08-11T18:40:31.457 回答