2

我正在尝试链接一些流式作业(用 Python 编写的作业)。我做到了,但是 -D 命令有问题。这是代码,

public class OJs extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
    //DOMINATION
    Path domin      = new Path( "diploma/join.txt");
    //dominationm.py
    Path domout     = new Path( "mapkeyout/");
    //dominationr.py

    String[] dom = new String[]
    {
        "-D mapred.reduce.tasks=0",
        "-file"     , "/home/hduser/optimizingJoins/dominationm.py" ,
        "-mapper"   , "dominationm.py"                              ,
        "-file"     , "/home/hduser/optimizingJoins/dominationr.py" ,
        "-reducer"  , "dominationr.py",         
        "-input"    , domin.toString()                              ,
        "-output"   , domout.toString()
    };
    JobConf domConf = new StreamJob().createJob( dom);
    //run domination job
    JobClient.runJob( domConf);
    return 0;
}//end run

public static void main( String[] args) throws Exception
{
    int res = ToolRunner.run( new Configuration(), new OJs(), args);
    System.exit( res);
}//end main
}//end OJs

我的问题是命令“-D mapred.reduce.tasks=0”。我得到这个错误,

ERROR streaming.StreamJob: Unrecognized option: -D...

其中 ... 包括任何可能的语法组合,即

"-D mapred.reduce.tasks=0"
"-Dmapred.reduce.tasks=0"
"-D", "mapred.reduce.tasks=0"
"-D", "mapred.reduce.tasks=", "0"
" -D mapred.reduce.tasks=0"

等等

当我在 -D 之前有一个空格时,则忽略此命令。我没有指定的减速器数量。当我没有这个空间时,我会收到我提到的错误。

我究竟做错了什么?

编辑

用 -jobconf 替换 -D 选项并不能解决问题。这是整个错误输出,

 Warning: $HADOOP_HOME is deprecated.

    12/10/04 00:25:02 ERROR streaming.StreamJob: Unrecognized option: -jobconf mapred.reduce.tasks=0
    Usage: $HADOOP_HOME/bin/hadoop jar \
          $HADOOP_HOME/hadoop-streaming.jar [options]

    Options:
   -input    <path>     DFS input file(s) for the Map step

   -output   <path>     DFS output directory for the Reduce step

   -mapper   <cmd|JavaClassName>      The streaming command to run

   -combiner <cmd|JavaClassName> The streaming command to run

   -reducer  <cmd|JavaClassName>      The streaming command to run

   -file     <file>     File/dir to be shipped in the Job jar file

   -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.

   -outputformat TextOutputFormat(default)|JavaClassName  Optional.

   -partitioner JavaClassName  Optional.

   -numReduceTasks <num>  Optional.

   -inputreader <spec>  Optional.

   -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands

   -mapdebug <path>  Optional. To run this script when a map task fails 

   -reducedebug <path>  Optional. To run this script when a reduce task fails 

   -io <identifier>  Optional.

   -verbose

   Generic options supported are

   -conf <configuration file>     specify an application configuration file

   -D <property=value>            use value for given property

   -fs <local|namenode:port>      specify a namenode

   -jt <local|jobtracker:port>    specify a job tracker

   -files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster

   -libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.

   -archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

   The general command line syntax is
   bin/hadoop command [genericOptions] [commandOptions]


   For more details about these options:

   Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info

   Exception in thread "main" java.lang.IllegalArgumentException: 
    at org.apache.hadoop.streaming.StreamJob.fail(StreamJob.java:549)
    at org.apache.hadoop.streaming.StreamJob.exitUsage(StreamJob.java:486)
    at org.apache.hadoop.streaming.StreamJob.parseArgv(StreamJob.java:246)
    at org.apache.hadoop.streaming.StreamJob.createJob(StreamJob.java:143)
    at OJs.run(OJs.java:135)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
    at OJs.main(OJs.java:183)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

此外,我无法理解,为什么当我直接使用 Streaming 运行作业时,Streaming 会识别 -D 选项,但是当我通过 JobClient 使用 Streaming 运行作业时,-D 选项识别失败。是 Streaming 的问题还是 sun.reflect 的问题?Ubuntu 中的 sun.reflect 包在哪里?

4

2 回答 2

3

看起来 StreamJob 不支持-Dkey=value通用配置选项。

请参阅http://wiki.apache.org/hadoop/HadoopStreaming,但看起来您需要使用(并且在该页面上作为示例明确调用):

-jobconf mapred.reduce.tasks=0
于 2012-10-02T00:53:53.447 回答
1

首先,这条线

..."-D mapred.reduce.tasks=0"...

应该写成

..."-D", "mapred.reduce.tasks=0"...

这是命令的标准模式,

"-commandname", "value"

要继续,程序通常可以接受或不接受某些参数。Hadoop 上下文中的这些参数称为选项。它们有两种,通用的和流式的,工作特定的。通用选项GenericOptionsParser处理。Hadoop Streaming 上下文中的作业特定选项由 StreamJob 处理。

所以,在初始问题的代码中设置 -D 选项的方式是错误的。这是因为 -D 是一个通用选项。StreamJob 无法处理通用选项。StreamJob 可以处理 -jobconf 但是,这是一个特定于作业的选项。所以这条线

..."-D", "mapred.reduce.tasks=0"...

正确写为

..."-jobconf", "mapred.reduce.tasks=0"...

使用 -jobconf 会引发此警告,

WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.

为避免此警告,需要 -D 选项,因此需要 GenericOptionsParser 来解析 -D 选项。

继续前进,当有人使用命令运行流式作业时

bin/hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-*.jar [ generic options] [ streaming( job specific) options]

真的会发生什么?为什么在这种情况下没有问题?在这种情况下,通用和特定于作业的选项都被正确解析。这是可能的,因为 Tool 接口通过 GenericOptionsParser 处理通用选项。作业特定选项由 hadoop-streaming-*.jar 中的 StreamJob() 处理。

实际上 hadoop-streaming-*.jar 有一个文件“HadoopStreaming.java”负责以上述方式提交的作业。HadoopStreaming 类使用两个参数调用 ToolRunner.run()。第一个参数是一个新的 StreamJob 对象,第二个参数包含所有命令行选项,即 [通用选项] 和 [流(作业特定)选项]。GenericOptionsParser 通过仅解析通用选项将通用选项与作业特定选项分开。然后,GenericOptionsParser 返回从 StreamJob() 解析的其余选项,即特定于作业的选项。StreamJob 通过 Tool.run([job specific args]) 调用,其中 Tool = StreamJob。看到这个这个有一个直觉为什么 Tool = StreamJob。

综上所述,

GenericOptionsParser -> 通用选项,

StreamJob -> 流式传输(作业特定)选项。

于 2012-10-09T10:03:21.047 回答