问题是如何使用 Hadoop Streaming(仅)在 Hadoop 中链接作业。
1 回答
这个答案是提问者实际提出的问题。我通常会引用它,但会弃权,因为它太大了。
这是关于如何使用Hadoop Streaming(当前为 1.0.3)链接两个或多个流作业的文档。
为了理解将执行链接的最终代码并能够编写任何其他链接工作,需要一些初步但实用的理论。
首先,Hadoop 中的工作是什么?Hadoop 作业是
hadoopJob = Configuration + Execution
在哪里,
配置 :使执行成为可能的所有设置。
执行 :完成所需任务的一组可执行文件或脚本文件。换句话说,就是我们任务的 map 和 reduce 步骤。
Configuration = hadoopEnvironment + userEnvironment
在哪里,
hadoopEnvironment :是Hadoop通用环境的搭建。这个通用环境是从资源定义的,即位于 $HADOOP_HOME/conf 目录中的 xml 文件。例如,一些资源是 core-site.xml、mapred-site.xml 和 hadoop-site.xml,它们分别定义了 hdfs 临时目录、作业跟踪器和集群节点数。
userEnvrironment :是运行作业时用户指定的参数。在 Hadoop 中,这些参数称为选项。
userEnvironment = genericOptions + streamingOptions
在哪里,
genericOptions:它们是通用的,因为它们对每个流式作业都有吸引力,独立于作业。它们是从 GenericsOptionsParser 处理的。
streamingOptions:它们是特定于工作的,因为它们对某个工作有吸引力。例如,每个作业都有自己的输入和输出目录或文件。它们由 StreamJob 处理。
示意性地,
hadoopJob
/\
/ \
/ \
/ \
/ \
Configuration Execution
/\ |
/ \ |
/ \ executable or script files
/ \
/ \
/ \
hadoopEnvironment userEnvironment
| /\
| / \
| / \
$HADOOP_HOME/conf / \
/ \
genericOptions streamingOptions
| |
| |
GenericOptionsParser StreamJob
任何人都可以看到,以上所有这些都是一系列配置。一部分是给集群的管理员(hadoopEnvironment),另一部分是给集群的用户(userEnvironment)。总而言之,如果我们暂时忘记执行部分,作业主要是抽象级别的配置。
我们的代码应该处理上述所有问题。现在我们准备好编写代码了。
首先,什么是代码级别的 Hadoop 作业?它是一个 jar 文件。每当我们提交作业时,我们都会提交一个带有一些命令行参数的 jar 文件。例如,当我们运行单个流式作业时,我们执行命令
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/
在哪里,
我们的工作是 hadoop-streaming-1.0.3.jar
使用命令行参数 -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/
在这个 jar 里面有我们的类,它可以妥善处理所有事情。
所以,我们打开一个新的 java 文件,比如 TestChain.java,
// import everything needed
public class TestChain
{
//code here
public static void main( String[] args) throws Exception
{
//code here
}//end main
}//end TestChain
为了处理 hadoopEnvironment,我们的类应该继承类Configured。配置类使我们能够访问 Hadoop 的环境和参数,即前面提到的资源。资源是 xml 文件,其中包含名称/值对形式的数据。
展望未来,每个界面或多或少都是外部世界与世界想要完成的任务之间的媒介。也就是说,界面是我们用来完成任务的工具。因此,我们的类是一个工具。为此,我们的类必须实现Tool接口,该接口声明了一个方法 run()。这个方法定义了我们的工具行为,当然当接口实现时。最后,为了使用我们的工具,我们使用类ToolRunner。ToolRunner 通过类GenericOptionsParser也有助于处理来自 userEnvironment 的 genericOptions。
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
// import everything else needed
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
完成图片,方法run()也称为驱动程序,设置作业,包括作业的初始化和配置。请注意,我们通过方法 ToolRunner.run()的第一个参数“new Configuration ”委托给 ToolRunner 处理 hadoopEnnvironment 。
到目前为止我们做了什么?我们只是设置了我们的工具将在其中运行的环境。现在我们必须定义我们的工具,即进行链接。
只要每个链作业都是流式作业,我们就这样创建它们中的每一个。我们使用 StreamJob 类的 StreamJob.createJob(String[] args) 方法来做到这一点。Strings 的 args 矩阵包含每个作业的“命令行”参数。这些命令行参数是指 userEnvironment 的 streamingOptions(job specific)。此外,这些参数采用参数/值对的形式。例如,如果我们的作业有 in.txt 文件作为输入,/out/ 作为输出目录,m.py 作为映射器,r.py 作为减速器,那么,
String[] example = new String[]
{
"-mapper" , "m.py"
"-reducer" , "r.py"
"-input" , "in.txt"
"-output" , "/out/"
}
你必须注意两件事。首先,“-”是必要的。正是这个小东西将参数与值区分开来。这里,mapper 是一个参数,m.py 是它的值。区别从“-”来理解。其次,如果在参数的左"和"-"之间添加一个空格,则忽略此参数。如果我们有"-mapper",则"-mapper"不被视为参数。StreamJob 解析时,args 矩阵看起来对于成对的参数/值。最后一件事,回想一下,作业大致是一个配置。我们期望这样,StreamJob.creatJob() 应该返回一个配置或类似的东西。实际上 StreamJob.createJob() 返回一个JobConf对象.
假设我们要链接三个工作,
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
// import everything else needed
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
//code here
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
//code here
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
在这一点上,我们设置了我们的工具将要运行的环境并定义了它的行为。然而,我们还没有付诸行动。ToolRunner 还不够。ToolRunner,作为一个整体运行我们的工具。它不运行单个链作业。我们必须这样做。
有两种方法可以做到这一点。第一种方法是使用JobClient,第二种方法是使用JobControl。
第一种方式 - JobClient
使用 JobClient,我们按顺序运行链作业,通过为每个作业调用 JobClient,一个作业一个接一个地运行。运行每个单独作业的方法是 JobClient.runJob(jobtorun),其中 jobtorun 是 JobConf 对象。
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
JobClient.runJob( job1Conf);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
JobClient.runJob( job2Conf);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
JobClient.runJob( job3Conf);
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
使用 JobClient 的这种方式的一个优点是作业进度打印在标准输出上。
JobClient 的一个缺点是它不能处理作业之间的依赖关系。
第二种方式 - JobControl
使用 JobControl,所有链式作业都是一组作业的一部分。在这里,每个作业都在该组的框架中执行。这意味着必须首先将每个链作业添加到组中,然后运行该组。该组是一个 FIFO,或者该组中每个作业的执行都遵循 FCFS(先到先服务)模式。使用 JobControl.addJob(jobtoadd) 方法将每个作业添加到组中。
JobControl 可以通过方法 x.addDependingJob(y) 处理依赖关系,其中作业 x 依赖于作业 y。这意味着,在作业 y 完成之前,作业 x 无法运行。如果作业 x 依赖于作业 y 和 z 并且 z 独立于 y,那么使用 x.addDependingJob(y) 和 x.addDependingJob(z) 我们可以表达这些依赖关系。
JobControl 与 JobClient 相矛盾,与Job对象“工作”。例如,当我们调用 x.addDependingJob(y) 方法时,x, y 是 Job 对象。JobControl.addJob(jobtoadd) 也是如此,jobtoadd 是一个 Job 对象。每个 Job 对象都是从 JobConf 对象创建的。回到我们拥有的代码,
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//TestChain below is an arbitrary name for the group
JobControl jobc = new JobControl( "TestChain");
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
Job job1 = new Job( job1conf);
jobc.addJob( job1);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
Job job2 = new Job( job2conf);
jobc.addJob( job2);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "/out2/par*"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
Job job3 = new Job( job3conf);
job3.addDependingJob( job2);
jobc.addJob( job3);
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
在上面的代码中,请注意 job3 依赖于 job2。如您所见,job3 的输入是 job2 的输出。这个事实是一个依赖。job3 等到 job2 完成。
到目前为止,我们只是在组中添加了连锁作业并描述了它们的依赖关系。我们需要最后一件事来运行这组作业。
蛮力说只调用方法 JobControl.run()。尽管这种方法有效,但它是有问题的。当链作业完成后,整个作业仍然会永远运行。一种正常工作的方法是从我们已经存在的作业线程定义一个新的执行线程(当作业运行时)。然后我们可以等到链作业完成后再退出。在链式作业执行的同时,我们可以询问作业执行信息,例如有多少作业已经完成,或者我们可以找到一个作业是否处于无效状态,这是什么。
这种使用 JobControl 的方式的一个优点是可以处理作业之间可能存在的许多依赖关系。
JobControl 的一个缺点是作业进度不会打印在标准输出上,也不会直接呈现。无论作业失败还是成功,都不会打印任何有用的信息。您必须检查 Hadoop 的 Web UI 或在下面的 while 循环中添加一些代码来跟踪作业的状态或所需的任何内容。最后,
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//TestChain below is an arbitrary name for the group
JobControl jobc = new JobControl( "TestChain");
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
Job job1 = new Job( job1conf);
jobc.addJob( job1);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
Job job2 = new Job( job2conf);
jobc.addJob( job2);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "/out2/par*"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
Job job3 = new Job( job3conf);
job3.addDependingJob( job2);
jobc.addJob( job3);
Thread runjobc = new Thread( jobc);
runjobc.start();
while( !jobc.allFinished())
{
//do whatever you want; just wait or ask for job information
}
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
错误
本节讨论可能发生的一些错误。在下面的错误消息中有一个类 OptimizingJoins。这个类只是为了演示各种错误的类,与这个讨论无关。
尝试编译时包不存在。
这是类路径的问题。像这样编译(例如添加 hadoop-streaming-1.0.3.jar 包),
javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java
并添加任何丢失的包。
java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob
总误差是,
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob
at OptimizingJoins.run(OptimizingJoins.java:135)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at OptimizingJoins.main(OptimizingJoins.java:248)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.streaming.StreamJob
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 8 more
这是我们 jar 文件的清单文件的问题。当我们以上述方式编译我们的工作时,一切都很好。Java 编译器会找到它需要的任何东西。但是当我们通过命令在 Hadoop 中运行我们的工作时
$HADOOP_HOME/bin/hadoop jar /home/hduser/TestChain.jar TestChain
然后运行我们的 jar 的 JVM 找不到 StreamJob。为了解决这个问题,当我们创建 jar 文件时,我们在 jar 中放入了一个包含 StreamJob 的类路径的清单文件。几乎,
MANIFEST.MF
Manifest-Version: 1.0
Class-Path: /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar
Created-By: 1.7.0_07 (Oracle Corporation)
请注意,MANIFEST.MF 文件始终以空行结尾。我们的 MANIFEST.MF 文件有 4 行,而不是 3 行。然后我们创建 jar 文件,例如,
jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class
错误流。StreamJob:无法识别的选项:-D
发生此错误是因为 StreamJob 无法解析 -D 选项。StreamJob 只能解析流式传输、作业特定选项,-D 是通用选项。
这个问题有两种解决方案。第一个解决方案是使用 -jobconf 选项而不是 -D。第二种解决方案是通过 GenericOptionsParser 对象解析 -D 选项。当然,在第二种解决方案中,您必须从 StreamJob.createJob() 参数中删除 -D 选项。
举个例子,第二种解决方案的“干净”代码实现是,
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain
{
public class Job1 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
JobClient.runJob( job1Conf);
return 0;
}//end run
}
public class Job2 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
JobClient.runJob( job2Conf);
return 0;
}//end run
}
public class Job3 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
JobClient.runJob( job3Conf);
return 0;
}//end run
}
public static void main( String[] args) throws Exception
{
TestChain tc = new TestChain();
//Domination
String[] j1args = new String[]
{
"-D", "mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator",
"-D", "mapred.text.key.comparator.options=-k1,1" ,
"-D", "mapred.reduce.tasks=1"
};
// Let ToolRunner handle generic command-line options
int j1res = ToolRunner.run( new Configuration(), tc.new Job1(), j1args);
//Cost evaluation
String[] j2rgs = new String[]
{
"-D", "mapred.reduce.tasks=12 " ,
"-D", "mapred.text.key,partitioner.options=-k1,1"
};
// Let ToolRunner handle generic command-line options
int j2res = ToolRunner.run( new Configuration(), tc.new Job2(), j2args);
//Minimum Cost
String[] j3args = new String[]
{
"-D", "mapred.reduce.tasks=1"
};
// Let ToolRunner handle generic command-line options
int j3res = ToolRunner.run( new Configuration(), tc.new Job1(), j3args);
System.exit( mres);
}
}//end TestChain
在上面的代码中,我们定义了一个封装链作业的全局类 TestChain。然后我们定义每个单独的链作业,即我们定义它的运行方法。每一个链式作业都是一个继承Configured并实现Tool的类。最后,从 TestChain 的 main 方法中,我们依次运行每个作业。请注意,在运行任何链作业之前,我们定义了它的通用选项。
编译
javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java
罐
jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class TestChain\$Dom.class TestChain\$Cost.class TestChain\$Min.class
错误 security.UserGroupInformation: PriviledgedActionException as:hduser cause:org.apache.hadoop.mapred.InvalidInputException: Input Pattern hdfs://localhost:54310/user/hduser/whateverFile 匹配 0 个文件
当我们使用 JobControl 时会发生此错误。例如,如果一个作业将前一个作业的输出作为输入,那么如果该输入-输出文件不存在,则会发生此错误。JobControl 以“并行”的方式运行所有独立的作业,而不是像 JobClient 那样一一进行。因此,Jobcontrol 尝试运行其输入文件不存在且因此失败的作业。
为了避免这种情况,我们使用 x.addDependingJob(y) 声明这两个作业之间存在依赖关系,作业 x 依赖于作业 y。现在,JobControl 不会尝试在并行相关作业中运行。