3

在 Eclipse 中开发 Java Map Reduce 作业有哪些选择?我的最终目标是在我的 amazon Hadoop 集群上运行我开发的 map/reduce 逻辑,但我想先在本地机器上测试逻辑并在其中放置断点,然后再将其部署到更大的集群。

我看到有一个适用于 Eclipse 的 Hadoop 插件,它看起来很旧(如果我错了,请纠正我),一家名为 Karmasphere 的公司为 ecplise 和 Hadoop 提供了一些东西,但我不确定它是否仍然可用。

您如何使用 Eclipse 开发、测试和调试您的 map/reduce 作业?

4

3 回答 3

4

我通过以下方式在 Eclipse 中开发 Cassandra/Hadoop 应用程序:

  1. 使用 maven (m2e) 为我的 Eclipse 项目收集和配置依赖项(Hadoop、Cassandra、Pig 等)

  2. 创建测试用例(src/test/java 中的类)来测试我的映射器和化简器。诀窍是使用扩展 RecordWriter 和 StatusReporter 的内部类动态构建上下文对象。如果你这样做,那么在你调用 setup/map/cleanup 或 setup/reduce/cleanup 之后,你可以断言正确的键/值对和上下文信息是由 mapper 或 reducer 编写的。mapred 和 mapreduce 中的上下文构造函数看起来很难看,但您会发现这些类很容易实例化。

  3. 一旦您编写了这些测试,maven 将在您每次构建时自动调用它们。

  4. 您可以通过选择项目并执行 Run --> Maven Test 来手动调用测试。事实证明这非常方便,因为测试是在调试模式下调用的,您可以在映射器和化简器中设置断点,并执行 Eclipse 允许您在调试中执行的所有很酷的事情。

  5. 一旦您对代码的质量感到满意,就可以使用 Maven 在一个 jar 中构建一个带有依赖关系的 jar,这一切都在 hadoop 非常喜欢的一个 jar 中。

顺便说一句,我已经基于 Eclipse 中的 M2T JET 项目构建了许多代码生成工具。它们为我上面提到的所有内容生成了基础设施,我只是为我的映射器、reducer 和测试用例编写逻辑。我想如果你考虑一下,你可能会想出一组可重用的类,你可以扩展它们来做几乎相同的事情。

这是一个示例测试用例类:

/*
 * 
 * This source code and information are provided "AS-IS" without 
 * warranty of any kind, either expressed or implied, including
 * but not limited to the implied warranties of merchantability
 * and/or fitness for a particular purpose.
 * 
 * This source code was generated using an evaluation copy 
 * of the Cassandra/Hadoop Accelerator and may not be used for
 * production purposes.
 *
 */
package com.creditco.countwords.ReadDocs;

// Begin imports 

import java.io.IOException;
import java.util.ArrayList;

import junit.framework.TestCase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.Test;

// End imports 

public class ParseDocsMapperTest extends TestCase {

    @Test
    public void testCount() {

        TestRecordWriter    recordWriter    = new TestRecordWriter();
        TestRecordReader    recordReader    = new TestRecordReader();
        TestOutputCommitter outputCommitter = new TestOutputCommitter();
        TestStatusReporter  statusReporter  = new TestStatusReporter();
        TestInputSplit      inputSplit      = new TestInputSplit();

        try {

                // Begin test logic


                // Get an instance of the mapper to be tested and a context instance
            ParseDocsMapper mapper = new ParseDocsMapper();

            Mapper<LongWritable,Text,Text,IntWritable>.Context context = 
                mapper.testContext(new Configuration(), new TaskAttemptID(),recordReader,recordWriter,outputCommitter,statusReporter,inputSplit);

                // Invoke the setup, map and cleanup methods
            mapper.setup(context);

            LongWritable key = new LongWritable(30);
            Text value = new Text("abc def ghi");

            mapper.map(key, value, context);

            if (recordWriter.getKeys().length != 3) {
                fail("com.creditco.countwords:ParseDocsMapperTest.testCount() - Wrong number of records written ");
            }
            mapper.cleanup(context);

                // Validation:
                //
                // recordWriter.getKeys() returns the keys written to the context by the mapper
                // recordWriter.getValues() returns the values written to the context by the mapper
                // statusReporter returns the most recent status and any counters set by the mapper
                //

                // End test logic

        } catch (Exception e) {
            fail("com.creditco.countwords:ParseDocsMapperTest.testCount() - Exception thrown: "+e.getMessage());
        }

    }

    final class TestRecordWriter extends RecordWriter<Text, IntWritable> {
        ArrayList<Text> keys = new ArrayList<Text>();
        ArrayList<IntWritable> values = new ArrayList<IntWritable>();
        public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { }
        public void write(Text key, IntWritable value) throws IOException, InterruptedException {
            keys.add(key);
            values.add(value);
        }
        public Text[] getKeys() {
            Text result[] = new Text[keys.size()];
            keys.toArray(result);
            return result;
        }
        public IntWritable[] getValues() {
            IntWritable[] result = new IntWritable[values.size()];
            values.toArray(result);
            return result;
        }
    };  

    final class TestRecordReader extends RecordReader<LongWritable, Text> {
        public void close() throws IOException { }
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            throw new RuntimeException("Tried to call RecordReader:getCurrentKey()");
        }
        public Text getCurrentValue() throws IOException, InterruptedException {
            throw new RuntimeException("Tried to call RecordReader:getCurrentValue()");
        }
        public float getProgress() throws IOException, InterruptedException {
            throw new RuntimeException("Tried to call RecordReader:getProgress()");
        }
        public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { }
        public boolean nextKeyValue() throws IOException, InterruptedException {
            return false;
        }
    };

    final class TestStatusReporter extends StatusReporter {
        private Counters counters = new Counters();
        private String status = null;
        public void setStatus(String arg0) {
            status = arg0;
        }
        public String getStatus() {
            return status;
        }
        public void progress() { }
        public Counter getCounter(String arg0, String arg1) {
            return counters.getGroup(arg0).findCounter(arg1);
        }
        public Counter getCounter(Enum<?> arg0) {
            return null;
        }
    };

    final class TestInputSplit extends InputSplit {
        public String[] getLocations() throws IOException, InterruptedException {
            return null;
        }
        public long getLength() throws IOException, InterruptedException {
            return 0;
        }
    };

    final class TestOutputCommitter extends OutputCommitter {
        public void setupTask(TaskAttemptContext arg0) throws IOException { }
        public void setupJob(JobContext arg0) throws IOException { }
        public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
            return false;
        }
        public void commitTask(TaskAttemptContext arg0) throws IOException { }
        public void cleanupJob(JobContext arg0) throws IOException { }
        public void abortTask(TaskAttemptContext arg0) throws IOException { }
    };

}

这是一个示例 maven pom。请注意,引用的版本有点过时,但只要这些版本保存在某个 maven 存储库中,您就可以构建这个项目。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.creditco</groupId>
  <artifactId>wordcount.example</artifactId>
  <version>0.0.1-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.2</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
  <dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>0.20.2</version>
        <type>jar</type>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.cassandra</groupId>
        <artifactId>cassandra-all</artifactId>
        <version>1.0.6</version>
        <type>jar</type>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.cassandraunit</groupId>
        <artifactId>cassandra-unit</artifactId>
        <version>1.0.1.1</version>
        <type>jar</type>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <artifactId>hamcrest-all</artifactId>
                <groupId>org.hamcrest</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.pig</groupId>
        <artifactId>pig</artifactId>
        <version>0.9.1</version>
        <type>jar</type>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.json</groupId>
        <artifactId>json</artifactId>
        <version>20090211</version>
        <type>jar</type>
        <scope>compile</scope>
    </dependency>
  </dependencies>
</project>
于 2012-06-13T02:57:01.473 回答
0

我使用 Apache 自带的 MiniMRCluster 集群。您可以在单元测试中启动一个迷你 Map Reduce 集群!HBase 还具有 HBaseTestingUtil,这非常棒,因为您可以在大约两行中启动 HDFS 和 MapReduce。

于 2013-10-03T06:35:39.993 回答
0

@Chris Gerken - 我试图通过将驱动程序作为 Java 应用程序运行来在 Eclipse 中运行字数统计作业,但我在 Mapper 上得到 ClassNotFoundException。在我看来,作为 java 应用程序运行,hadoop 作业没有使用 jar 获得所需的 Mapper 和 Reduce。

于 2013-12-29T01:18:49.937 回答