我正在尝试使用 MiniMRYarnCluster 在本地运行 MR 作业。 我正在使用旧的 mapreduce(不是 YARN)和 mapreduce API v2 这些东西可以在这里找到:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.0.0-cdh4.1.1</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
这是日志的一部分:
--127.0.1.1-58175-1358256748215, blockid: BP-1072059606-127.0.1.1-1358256746988:blk_6137856716359201843_1008, duration: 229871
13/01/15 17:32:34 INFO localizer.LocalizedResource: Resource hdfs://localhost:50123/apps_staging_dir/ssa/.staging/job_1358256748507_0001/job.xml transitioned from DOWNLOADING to LOCALIZED
13/01/15 17:32:34 INFO container.Container: Container container_1358256748507_0001_01_000001 transitioned from LOCALIZING to LOCALIZED
13/01/15 17:32:34 INFO container.Container: Container container_1358256748507_0001_01_000001 transitioned from LOCALIZED to RUNNING
13/01/15 17:32:34 INFO nodemanager.DefaultContainerExecutor: launchContainer: [bash, /home/ssa/devel/POIClusterMapreduceTest/ru.mrjob.poi.POIClusterMapreduceTest-localDir-nm-0_1/usercache/ssa/appcache/application_1358256748507_0001/container_1358256748507_0001_01_000001/default_container_executor.sh]
13/01/15 17:32:34 WARN nodemanager.DefaultContainerExecutor: Exit code from task is : 1
13/01/15 17:32:34 INFO nodemanager.ContainerExecutor:
13/01/15 17:32:34 WARN launcher.ContainerLaunch: Container exited with a non-zero exit code 1
这是一个例外:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/service/CompositeService
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.service.CompositeService
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
... 12 more
Could not find the main class: org.apache.hadoop.mapreduce.v2.app.MRAppMaster. Program will exit.
我使用 org.apache.hadoop.mapreduce.v2.TestMRJobs 作为我自己测试的基础。有没有人遇到过这个问题?
这是我的代码,它是一个抽象基类,用于在 CI 服务器或开发人员机器上本地测试 MR 作业:
public abstract class AbstractClusterMapReduceTest {
private static final Log LOG = LogFactory.getLog(AbstractClusterMapReduceTest.class);
public static final String DEFAULT_LOG_CATALOG = "local-mr-logs";
private static final int DEFAULT_NAMENODE_PORT = 50123;
private static final int ONE_DATANODE = 1;
private static final int DEFAULT_REDUCE_NUM_TASKS = 1;
private static final String SLASH = "/";
private static final String DEFAULT_MR_INPUT_DATA_FILE = "mr-input-data-file";
private MiniMRYarnCluster mrCluster;
private MiniDFSCluster dfsCluster;
/** Shitty code from base Cloudera example*/
private static Path TEST_ROOT_DIR = new Path("target",
AbstractClusterMapReduceTest.class.getName() + "-tmpDir").makeQualified(getLocalFileSystem());
static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
private static FileSystem getLocalFileSystem(){
try {
return FileSystem.getLocal(new Configuration());
} catch (IOException e) {
throw new Error("Can't access local file system. MR cluster can't be started", e);
}
}
/**
* Always provide path to log catalog.
* Default is: ${project.build.directory}/{@link AbstractClusterMapReduceTest#DEFAULT_LOG_CATALOG}
* */
protected String getPathToLogCatalog(){
return getPathToOutputDirectory()+ SLASH + DEFAULT_LOG_CATALOG;
}
private String getPathToOutputDirectory(){
return System.getProperty("project.build.directory");
}
private void checkAppJar(){
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
throw new Error("MRAppJar " + MiniMRYarnCluster.APPJAR+ " not found. Not running test.");
}else{
LOG.info(MiniMRYarnCluster.APPJAR + " is at the right place. Can continue to setup Env...");
}
}
public void setupEnv() throws IOException{
checkAppJar();
System.setProperty("hadoop.log.dir", getPathToLogCatalog());
System.setProperty("javax.xml.parsers.SAXParserFactory",
"com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
dfsCluster = buildMiniDFSCluster();
//dfsCluster.getFileSystem().makeQualified(createPath(getHDFSPathToInputData()));
//dfsCluster.getFileSystem().makeQualified(createPath(getOutputPath()));
mrCluster = new MiniMRYarnCluster(this.getClass().getName(), 1);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", getFileSystem().getUri().toString()); // use HDFS
//conf.set(MRJobConfig.MR_AM_STAGING_DIR, getPathToOutputDirectory()+"/tmp-mapreduce");
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
mrCluster.init(conf);
mrCluster.start();
//Cloudera tricks :)
// Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
// workaround the absent public discache.
getLocalFileSystem().copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
getLocalFileSystem().setPermission(APP_JAR, new FsPermission("700"));
}
public void tearDown() {
if (mrCluster != null) {
mrCluster.stop();
mrCluster = null;
}
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
}
}
public boolean createAndSubmitJob() throws IOException, ClassNotFoundException, InterruptedException{
LOG.info("createAndSubmitJob: enter");
checkAppJar();
LOG.info("MRAppJar has been found. Can start to create Job");
Configuration configuration = mrCluster.getConfig();
configuration.set(MRConfig.MASTER_ADDRESS, "local");
Job job = Job.getInstance(configuration);
job.setJobName(this.getClass().getSimpleName()+"-job");
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(getMRJobClass());
job.setJobName(getMRJobClass().getSimpleName());
job.setNumReduceTasks(getReduceNumTasks());
job.setOutputKeyClass(getOutputKeyClass());
job.setOutputValueClass(getOutputValueClass());
job.setMapperClass(getMapperClass());
job.setReducerClass(getReducerClass());
job.setInputFormatClass(getInputFormat());
job.setOutputFormatClass(getOutputFormat());
FileInputFormat.setInputPaths(job, getHDFSPathToInputData());
FileOutputFormat.setOutputPath(job, createPath(getOutputPath()));
job.setSpeculativeExecution(false);
job.setMaxMapAttempts(1); // speed up failures
LOG.info("Submitting job...");
job.submit();
LOG.info("Job has been submitted.");
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
LOG.info("trackingUrl:" +trackingUrl);
LOG.info("jobId:" +jobId);
return job.waitForCompletion(true);
}
protected FileSystem getFileSystem() throws IOException {
return dfsCluster.getFileSystem();
}
protected int getReduceNumTasks(){
return DEFAULT_REDUCE_NUM_TASKS;
}
/**
* @return InputStream instance to file you want to run with your MR job
* */
protected InputStream getInputStreamForInputData() {
return this.getClass().getClassLoader().getResourceAsStream(this.getClass().getSimpleName()+"/"+getInputDatasetName());
//return getPathToOutputDirectory()+ SLASH + DEFAULT_INPUT_CATALOG+"/mr-input-data";
}
protected String getHDFSPathToInputData() throws IOException{
InputStream inputStream = getInputStreamForInputData();
Path hdfsInputPath = new Path(DEFAULT_MR_INPUT_DATA_FILE);
FSDataOutputStream fsDataOutputStream = getFileSystem().create(hdfsInputPath);
copyStream(inputStream, fsDataOutputStream);
fsDataOutputStream.close();
inputStream.close();
return hdfsInputPath.toString();
}
private void copyStream(InputStream input, OutputStream output) throws IOException {
byte[] buffer = new byte[1024]; // Adjust if you want
int bytesRead;
while ((bytesRead = input.read(buffer)) != -1)
{
output.write(buffer, 0, bytesRead);
}
}
/**
* Dataset should be placed in resources/ConcreteClusterMapReduceTest
* @return a name of a file from catalog.
* */
protected abstract String getInputDatasetName();
/**
* @return path reducer output
* default is: @{link AbstractClusterMapReduceTest#DEFAULT_OUTPUT_CATALOG}
* */
protected String getOutputPath(){
return "mr-data-output";
}
/**
* Creates @{link Path} using absolute path to some FS resource
* @return new Path instance.
* */
protected Path createPath(String pathToFSResource){
return new Path(pathToFSResource);
}
/**
* Builds new instance of MiniDFSCluster
* Default: @{link DEFAULT_NAMENODE_PORT}, @{link DEFAULT_NAMENODE_PORT}
* @return MiniDFSCluster instance.
* */
protected MiniDFSCluster buildMiniDFSCluster() throws IOException {
return new MiniDFSCluster.Builder(new Configuration())
.nameNodePort(DEFAULT_NAMENODE_PORT)
.numDataNodes(ONE_DATANODE)
.build();
}
protected abstract Class<? extends Configured> getMRJobClass();
protected abstract Class<? extends Mapper> getMapperClass();
protected abstract Class<? extends Reducer> getReducerClass();
protected abstract Class<? extends InputFormat> getInputFormat();
protected abstract Class<? extends OutputFormat> getOutputFormat();
protected abstract Class<?> getOutputKeyClass();
protected abstract Class<?> getOutputValueClass();
}
具体的测试子类:
public class POIClusterMapreduceTest extends AbstractClusterMapReduceTest{
private static final String INTEGRATION = "integration";
@BeforeClass(groups = INTEGRATION)
public void setup() throws IOException {
super.setupEnv();
}
//@Test(groups = INTEGRATION)
public void runJob() throws InterruptedException, IOException, ClassNotFoundException {
boolean result = createAndSubmitJob();
MatcherAssert.assertThat(result, Matchers.is(true));
String outputResultAsString = getFileSystem().open(createPath(getOutputPath())).readUTF();
MatcherAssert.assertThat(outputResultAsString.length(), Matchers.greaterThan(0));
}
@AfterClass(groups = INTEGRATION)
public void tearDown(){
super.tearDown();
}
@Override
protected Class<Main> getMRJobClass() {
return Main.class;
}
@Override
protected Class<POIMapper> getMapperClass() {
return POIMapper.class;
}
@Override
protected Class<Reducer> getReducerClass() {
return Reducer.class;
}
@Override
protected Class<TextInputFormat> getInputFormat() {
return TextInputFormat.class;
}
@Override
protected Class<TextOutputFormat> getOutputFormat() {
return TextOutputFormat.class;
}
@Override
protected Class<LongWritable> getOutputKeyClass() {
return LongWritable.class;
}
@Override
protected Class<XVLRDataWritable> getOutputValueClass() {
return XVLRDataWritable.class;
}
@Override
protected String getInputDatasetName() {
return "mr-input-data";
}
}