我想从我的 hadoop 流作业中的文件中读取列表。这是我的简单 mapper.py:
#!/usr/bin/env python
import sys
import json
def read_file():
id_list = []
#read ids from a file
f = open('../user_ids','r')
for line in f:
line = line.strip()
id_list.append(line)
return id_list
if __name__ == '__main__':
id_list = set(read_file())
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
line = json.loads(line)
user_id = line['user']['id']
if str(user_id) in id_list:
print '%s\t%s' % (user_id, line)
这是我的 reducer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
current_id = None
current_list = []
id = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
id, line = line.split('\t', 1)
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_id == id:
current_list.append(line)
else:
if current_id:
# write result to STDOUT
print '%s\t%s' % (current_id, current_list)
current_id = id
current_list = [line]
# do not forget to output the last word if needed!
if current_id == id:
print '%s\t%s' % (current_id, current_list)
现在运行它我说:
hadoop jar contrib/streaming/hadoop-streaming-1.1.1.jar -file ./mapper.py \
-mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py \
-input test/input.txt -output test/output -file '../user_ids'
作业开始运行:
13/11/07 05:04:52 INFO streaming.StreamJob: map 0% reduce 0%
13/11/07 05:05:21 INFO streaming.StreamJob: map 100% reduce 100%
13/11/07 05:05:21 INFO streaming.StreamJob: To kill this job, run:
我得到错误:
job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201309172143_1390_m_000001
13/11/07 05:05:21 INFO streaming.StreamJob: killJob...
当我不从文件 ../user_ids 中读取 ID 时,它不会给我任何错误。我认为问题是找不到我的 ../user_id 文件。我也使用了 hdfs 中的位置,但仍然无法正常工作。谢谢你的帮助。