1

我正在尝试使用二进制可执行文件和缓存的存档运行一个简单的示例,但它似乎不起作用:

我正在尝试运行的示例有一个映射器,它生成三个随机双精度数和一个键,reducer 会将这三个数字平均在一起并记录平均值。很简单的东西。我在 c 中写了一个简单的 EXE 来生成随机数:

#include <cstdio>
#include <stdlib.h>
#include <time.h> 

int main(int argc, char*argv[]) {
    srand ( time(NULL) );
    int rand1 = rand() % 10 + 1;
    int rand2 = rand() % 10 + 1;
    int rand3 = rand() % 10 + 1;
    printf("%s, %f, %f, %f", argv[1], (float)rand1/5, (float)rand2/5, (float)rand3/5);
    return 0; 
}

所以如果我打电话给 ./a.out [key]

我再看看吧

键,随机1,随机2,随机3

我正在使用 python 流,这是我用 python 编写的映射器:


#!/usr/bin/python

import os
import sys
import random
import shlex, subprocess
from subprocess import PIPE

from misc import *

for line in sys.stdin:
    line = line.strip()
    sline = line.split(',')

    # parse out the au and cost from the line ....
    key = int( sline[0] )
    au = int( sline[1])
    cost = float( sline[2] )

    command = "./a.out %d" % ( key )
    cli_parts = shlex.split(command)
    mp = subprocess.Popen(cli_parts, stdin=PIPE, stderr=PIPE,
stdout=PIPE)
    print mp.communicate()[0].strip('\r\n')

这是只进行平均的减速器:


#!/usr/bin/python

import os
import sys
import math
import re

from misc import *

for line in sys.stdin:
    line = line.strip()
    m = re.match("(\w+),\s+(.*),\s+(.*),\s+(.*)", line)
    if m:
        average = (float(m.groups(0)[1]) + float(m.groups(0)[2]) +
float(m.groups(0)[3])) / 3
        print "key: %d average: %f" % ( int(m.groups(0)[0]), average )
    else:
        print "not found"

#f.close()

所以在阅读了文档之后,似乎我需要编译二进制文件和 tar.gz-it

1) 焦油 cvaf a.out.tar.gz a.out

现在我应该能够通过 -cacheArchive 参数将它传递给数据节点,并且一切都应该正常工作。这是我的 Hadoop 命令:

hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2+737.jar \ -numReduceTasks 1 \ -mapper mapper1.py \ -file mapper1.py \ -reducer reducer1.py \ -文件 reducer1.py \ -file misc.py \ -cacheArchive a.out.tar.gz \ -input input/* \ -output testsvmoutput \ -verbose

不用说,这不起作用,似乎是因为映射器没有生成数据。

我通过在命令行上测试它来确认我的代码有效:

猫输入/svminput1.txt | python mapper1.py | 排序 | python reducer1.py

我很想有人解释为什么这不起作用,如何通过 cacheArchive 命令传递 exe 在数据节点上工作,和/或如何调试它,因为来自 Cloudera html 面板的错误消息没有那么有用。

谢谢

这是我看到的错误:

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:317)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)
4

2 回答 2

1

我看到你做错了一些事情。你想设置你的 python 脚本 chmod a+x 并像这样进行测试 cat input/svminput1.txt | ./mapper1.py | 排序 | ./reducer1.py 因为这基本上是 Hadoop 在流式传输中所做的就是启动脚本(操作系统使用正确的解释器处理执行脚本)

现在,对于移动到作业中以与您的映射器和减速器一起使用的其他文件,您只需通过命令行将它们添加到您想要的文件中(就像您使用 misc.py 一样),并且当您的映射/缩减启动时,这些文件是本地的“。 " 到你的脚本,所以导入并使用它们或你想要的任何东西(打开一个文本文件,无论你想要什么)......你应该用 chacheArchive 的东西来做这件事,也只需将它们每个都推送为 -file 应该没问题。

如果您还没有看过,这里是 Python 流式传输http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/的一个非常基本的文章

这是一个更高级的带有连接和键的python流http://allthingshadoop.com/2010/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/这可能会有所帮助还。

希望这会有所帮助,如果我认为不再需要执行特定的错误

于 2010-12-29T16:28:33.730 回答
0

你确定python/usr/bin/python在集群机器上可用吗?一个好的做法是始终#!/usr/bin/env python在脚本的顶部使用......这样它就不会被硬编码。

还要确保检查集群机器上的 python 安装......确保导入工作。

您没有在代码中使用任何尝试/例外,因此很难调试问题所在...我建议尝试/排除您的代码并将日志消息打印到众所周知的位置,例如/tmp...。

有关更多信息,您可以查看davidvhill.com/articles....我的实际生产代码在此处捕获....

于 2011-08-18T00:10:34.560 回答