我正在尝试使用二进制可执行文件和缓存的存档运行一个简单的示例,但它似乎不起作用:
我正在尝试运行的示例有一个映射器,它生成三个随机双精度数和一个键,reducer 会将这三个数字平均在一起并记录平均值。很简单的东西。我在 c 中写了一个简单的 EXE 来生成随机数:
#include <cstdio>
#include <stdlib.h>
#include <time.h>
int main(int argc, char*argv[]) {
srand ( time(NULL) );
int rand1 = rand() % 10 + 1;
int rand2 = rand() % 10 + 1;
int rand3 = rand() % 10 + 1;
printf("%s, %f, %f, %f", argv[1], (float)rand1/5, (float)rand2/5, (float)rand3/5);
return 0;
}
所以如果我打电话给 ./a.out [key]
我再看看吧
键,随机1,随机2,随机3
我正在使用 python 流,这是我用 python 编写的映射器:
#!/usr/bin/python
import os
import sys
import random
import shlex, subprocess
from subprocess import PIPE
from misc import *
for line in sys.stdin:
line = line.strip()
sline = line.split(',')
# parse out the au and cost from the line ....
key = int( sline[0] )
au = int( sline[1])
cost = float( sline[2] )
command = "./a.out %d" % ( key )
cli_parts = shlex.split(command)
mp = subprocess.Popen(cli_parts, stdin=PIPE, stderr=PIPE,
stdout=PIPE)
print mp.communicate()[0].strip('\r\n')
这是只进行平均的减速器:
#!/usr/bin/python
import os
import sys
import math
import re
from misc import *
for line in sys.stdin:
line = line.strip()
m = re.match("(\w+),\s+(.*),\s+(.*),\s+(.*)", line)
if m:
average = (float(m.groups(0)[1]) + float(m.groups(0)[2]) +
float(m.groups(0)[3])) / 3
print "key: %d average: %f" % ( int(m.groups(0)[0]), average )
else:
print "not found"
#f.close()
所以在阅读了文档之后,似乎我需要编译二进制文件和 tar.gz-it
1) 焦油 cvaf a.out.tar.gz a.out
现在我应该能够通过 -cacheArchive 参数将它传递给数据节点,并且一切都应该正常工作。这是我的 Hadoop 命令:
hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2+737.jar \ -numReduceTasks 1 \ -mapper mapper1.py \ -file mapper1.py \ -reducer reducer1.py \ -文件 reducer1.py \ -file misc.py \ -cacheArchive a.out.tar.gz \ -input input/* \ -output testsvmoutput \ -verbose
不用说,这不起作用,似乎是因为映射器没有生成数据。
我通过在命令行上测试它来确认我的代码有效:
猫输入/svminput1.txt | python mapper1.py | 排序 | python reducer1.py
我很想有人解释为什么这不起作用,如何通过 cacheArchive 命令传递 exe 在数据节点上工作,和/或如何调试它,因为来自 Cloudera html 面板的错误消息没有那么有用。
谢谢
这是我看到的错误:
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:317)
at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
at org.apache.hadoop.mapred.Child.main(Child.java:211)