我正在尝试使用 Python MRJob lib 执行 mapreduce 作业,但在让它正确分布在我的 Hadoop 集群中时遇到了一些问题。我相信我只是错过了 mapreduce 的基本原则。我的集群是一个小型的,一主一从的测试集群。基本思想是我只是请求一系列带有参数的网页,对它们进行一些分析并返回网页上的一些属性。
我的 map 函数的输入只是一个带有以下参数的 URL 列表:
这样初始输入的键值对就是 key:None, val:URL。
def mapper(self, key, url):
'''Yield domain as the key, and (url, query parameter) tuple as the value'''
parsed_url = urlparse(url)
domain = parsed_url.scheme + "://" + parsed_url.netloc + "/"
if self.myclass.check_if_param(parsed_url):
parsed_url_query = parsed_url.query
url_q_dic = parse_qs(parsed_url_query)
for query_param, query_val in url_q_dic.iteritems():
#yielding a tuple in mrjob will yield a list
yield domain, (url, query_param)
很简单,我只是检查以确保 URL 有一个参数并将 URL 的域作为键和一个元组给我 URL 和查询参数作为值,MRJob 友好地将其转换为一个列表以传递给减速器,它如下:
def reducer(self, domain, url_query_params):
final_list = []
for url_query_param in url_query_params:
url_to_list_props = url_query_param[0]
param_to_list_props = url_query_param[1]
#set our target that we will request and do some analysis on
self.myclass.set_target(url_to_list_props, param_to_list_props)
#perform a bunch of requests and do analysis on the URL requested
props_list = self.myclass.get_props()
for prop in props_list:
#index this stuff to a central db
MapReduceIndexer(domain, final_list).add_prop_info()
yield domain, final_list
我的问题是只运行一个减速器任务。我希望 reducer 任务的数量等于映射器发出的唯一键的数量。上面代码的最终结果是我有一个在master上运行的reducer,但是slave闲着什么也不做,这显然不理想。我注意到在我的输出中启动了一些映射器任务,但始终只有 1 个减速器任务。除此之外,任务运行顺利,所有工作都按预期进行。
我的问题是……我到底做错了什么?我是否误解了 reduce 步骤或在某处搞砸了我的键值对?为什么没有多个减速器在这个工作上运行?
更新:好的,所以从给出的答案中我将 mapred.reduce.tasks 增加到更高(这是我现在意识到的默认值是 1)。这确实是我得到 1 个减速器的原因。我现在看到同时执行 3 个 reduce 任务。我的奴隶现在有一个需要解决的导入错误,但至少我到了某个地方......