2

我正在尝试使用 MRUnit 测试一个简单的 MapReduce 项目。我为 mapDriver 设置输入,然后调用mapDriver.runTest()(我也尝试过mapDriver.run()但产生相同的错误)。

我编写了一个自定义键,它重载了write(DataOutput out),readFields(DataInput in)compareTo(...)方法。调试时,Key 使用write(DataOutput out). 但是,在键的readFields(DataInput in)方法(正确检索先前使用 写入的数据write(DataOutput out))完成后,将引发以下错误。

我在这里搜索了类似的帖子,并尝试覆盖hashCode()andequals()方法无济于事。使用自定义键时,MRUnit 是否需要覆盖任何其他方法?这篇文章与序列化中带有 Avro NullPointerException 的 MRUnit最相似。但是,我没有使用 avro,据我所知,我使用的是默认序列化。干杯!

java.lang.NullPointerException
at org.apache.hadoop.mrunit.Serialization.copy(Serialization.java:61)
at org.apache.hadoop.mrunit.Serialization.copy(Serialization.java:81)
at org.apache.hadoop.mrunit.mapreduce.mock.MockContextWrapper$4.answer(MockContextWrapper.java:78)
at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:31)
at org.mockito.internal.MockHandler.handle(MockHandler.java:97)
at org.mockito.internal.creation.MethodInterceptorFilter.intercept(MethodInterceptorFilter.java:47)
at org.apache.hadoop.mapreduce.Mapper$Context$$EnhancerByMockitoWithCGLIB$$f555e120.write(<generated>)
at model.RMSEEvaluation$Mapper.map(RMSEEvaluation.java:57)
at model.RMSEEvaluation$Mapper.map(RMSEEvaluation.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mrunit.mapreduce.MapDriver.run(MapDriver.java:221)
at org.apache.hadoop.mrunit.MapDriverBase.runTest(MapDriverBase.java:150)
at org.apache.hadoop.mrunit.TestDriver.runTest(TestDriver.java:137)
at test.TestRMSEEvaluation.testSetValues(TestRMSEEvaluation.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at junit.framework.TestCase.runTest(TestCase.java:168)
at junit.framework.TestCase.runBare(TestCase.java:134)
at junit.framework.TestResult$1.protect(TestResult.java:110)
at junit.framework.TestResult.runProtected(TestResult.java:128)
at junit.framework.TestResult.run(TestResult.java:113)
at junit.framework.TestCase.run(TestCase.java:124)
at junit.framework.TestSuite.runTest(TestSuite.java:243)
at junit.framework.TestSuite.run(TestSuite.java:238)
at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:83)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
4

2 回答 2

4

我找到了解决此错误的方法。错误是因为未在Configurationfor中设置序列化类型MapDriver mapDriver。我必须使用以下内容显式设置序列化:

Configuration conf = new Configuration();
conf.set("io.serializations","org.apache.hadoop.io.serializer.JavaSerialization," 
            + "org.apache.hadoop.io.serializer.WritableSerialization");
mapDriver.setConfiguration(conf);

希望这可以帮助任何有类似问题的人!

于 2013-03-19T09:25:26.610 回答
1

首先,值得测试序列化/反序列化是否真的按预期工作。
在不知道您如何编写测试的情况下,以下简单的测试适用于MRUnit 0.9.0-incubating和 JUnit 4.10:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.Assert;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

public class TestCustom {

    private MapDriver<CustomRecord, Text, CustomRecord, Text> mapDriver;

    private Mapper<CustomRecord, Text, CustomRecord, Text> map = 
        new Mapper<CustomRecord, Text, CustomRecord, Text>();

    private Reducer<CustomRecord, Text, CustomRecord, Text> reduce = 
        new Reducer<CustomRecord, Text, CustomRecord, Text>();

    private ReduceDriver<CustomRecord, Text, CustomRecord, Text> reduceDriver
        = ReduceDriver.newReduceDriver(reduce);

    private MapReduceDriver<CustomRecord, Text, CustomRecord, 
      Text, CustomRecord, Text> mapReduceDriver;

    private Configuration conf = new Configuration();

    //test data
    private Pair<CustomRecord, Text> data;

    //shuffled and sorted data
    private static List<Pair<CustomRecord, List<Text>>> shuffledData; 

    @Before
    public void init() {

        mapDriver = MapDriver.newMapDriver(map);
        mapReduceDriver = MapReduceDriver.newMapReduceDriver(map, reduce);
        mapDriver.withConfiguration(conf);
        initData();
    }

    private void initData() {

        CustomRecord key = new CustomRecord("first", 1);
        Text value = new Text("key1");
        data = new Pair<CustomRecord, Text>(key, value);
    }

    @Test
    public void testMapper() throws IOException {

        mapDriver.withInput(data);
        //expected output result
        mapDriver.withOutput(data); 
        mapDriver.runTest(true);

        //shuffle and sort
        List<Pair<CustomRecord, Text>> pairs = 
          new ArrayList<Pair<CustomRecord, Text>>();
        pairs.add(data);

        shuffledData = mapReduceDriver.shuffle(pairs);

    }

    @Test
    public void testReducer() throws IOException {

        // feed input to one single reduce call
        Pair<CustomRecord, List<Text>> pair = shuffledData.get(0);
        reduceDriver.withInput(pair.getFirst(), pair.getSecond());

        //reducer's output
        List<Pair<CustomRecord, Text>> result = reduceDriver.run();

        Assert.assertEquals("Key mismatch!", 
          data.getFirst(), result.get(0).getFirst());
        Assert.assertEquals("Value mismatch!", 
          data.getSecond(), result.get(0).getSecond());
    }
}

它使用自定义 Writable 作为键 ( CustomRecord )测试身份映射器和化简器。
请注意,该键实现了WritableComparable,并覆盖了 hashCode 和 equals。

于 2013-03-18T21:45:05.937 回答