2

How do you use the MultipleOutputs class in a reducer to write multiple outputs, each of which can have its own unique configuration? There is some documentation in the MultipleOutputs javadoc, but it seems limited to Text outputs. It turns out that MultipleOutputs can handle the output path, key class and value class of each output, but attempts to use output formats that require the use of other configuration properties fail.

(This question has come up several times but my attempts to answer it have been thwarted because the asker actually had a different problem. Since this question has taken more than a few days of investigation for me to answer, I'm answering my own question here as suggested by this Meta Stack Overflow question.)

4

2 回答 2

2

I've crawled through the MultipleOutputs implementation and have found that it doesn't support any OutputFormatType that has properties other than outputDir, key class and value class. I tried to write my own MultipleOutputs class, but that failed because it needs to call a private method somewhere in the Hadoop classes.

I'm left with only one workaround that seems to work in all cases and all combinations of output formats and configurations: Write subclasses of the OutputFormat classes that I want to use (these turn out to be reusable). These classes understand that other OutputFormats are in use concurrently and know how to store away their properties. The design exploits the fact that an OutputFormat can be configured with the context just before being asked for its RecordWriter.

I've got this to work with Cassandra's ColumnFamilyOutputFormat:

package com.myorg.hadoop.platform;

import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

public abstract class ConcurrentColumnFamilyOutputFormat 
                        extends ColumnFamilyOutputFormat 
                        implements Configurable {

private static String[] propertyName = {
        "cassandra.output.keyspace" ,
        "cassandra.output.keyspace.username" ,
        "cassandra.output.keyspace.passwd" ,
        "cassandra.output.columnfamily" ,
        "cassandra.output.predicate",
        "cassandra.output.thrift.port" ,
        "cassandra.output.thrift.address" ,
        "cassandra.output.partitioner.class"
        };

private Configuration configuration;

public ConcurrentColumnFamilyOutputFormat() {
    super();
}

public Configuration getConf() {
    return configuration;
}

public void setConf(Configuration conf) {

    configuration = conf;

    String prefix = "multiple.outputs." + getMultiOutputName() + ".";

    for (int i = 0; i < propertyName.length; i++) {
        String property = prefix + propertyName[i];
        String value = conf.get(property);
        if (value != null) {
            conf.set(propertyName[i], value);
        }
    }

}

public void configure(Configuration conf) {

    String prefix = "multiple.outputs." + getMultiOutputName() + ".";

    for (int i = 0; i < propertyName.length; i++) {
        String property = prefix + propertyName[i];
        String value = conf.get(propertyName[i]);
        if (value != null) {
            conf.set(property, value);
        }
    }

}

public abstract String getMultiOutputName();

}

For each Cassandra (in this case) output you want for your reducer, you'd have a class:

package com.myorg.multioutput.ReadCrawled;

import com.myorg.hadoop.platform.ConcurrentColumnFamilyOutputFormat;

public class StrongOutputFormat extends ConcurrentColumnFamilyOutputFormat {

    public StrongOutputFormat() {
        super();
    }

    @Override
    public String getMultiOutputName() {
        return "Strong";
    }

}

and you'd configure it in your mapper/reducer configuration class:

    // This is how you'd normally configure the ColumnFamilyOutputFormat

ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "Partner", "Strong");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");

    // This is how you tell the MultipleOutput-aware OutputFormat that
    // it's time to save off the configuration so no other OutputFormat
    // steps all over it.

new StrongOutputFormat().configure(job.getConfiguration());

    // This is where we add the MultipleOutput-aware ColumnFamilyOutputFormat
    // to out set of outputs

MultipleOutputs.addNamedOutput(job, "Strong", StrongOutputFormat.class, ByteBuffer.class, List.class);

Just to give another example, the MultipleOutput subclass for FileOutputFormat uses these properties:

    private static String[] propertyName = {
        "mapred.output.compression.type" ,
        "mapred.output.compression.codec" ,
        "mapred.output.compress" ,
        "mapred.output.dir"
        };

and would be implement just like ConcurrentColumnFamilyOutputFormat above except that it would use the above properties.

于 2012-11-26T23:15:05.843 回答
0

我已经为 Cassandra 实现了 MultipleOutputs 支持(请参阅此 JIRA 票证,它目前计划在 1.2 中发布。如果您现在需要它,您可以在票证中应用补丁。还可以查看有关该主题的演示文稿,其中提供了关于它的用法。

于 2012-11-28T14:49:39.200 回答