9

我有一个 hadoop 流作业,其输出不包含键/值对。您可以将其视为仅值对或仅键对。

我的流式减速器(一个 php 脚本)正在输出由换行符分隔的记录。Hadoop 流将其视为没有值的键,并在换行符之前插入一个制表符。这个额外的标签是不需要的。

如何删除它?

我将 hadoop 1.0.3 与 AWS EMR 一起使用。我下载了 hadoop 1.0.3 的源代码,并在 hadoop-1.0.3/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java 中找到了这段代码:

reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");

因此,我尝试将-D stream.reduce.output.field.separator=其作为参数传递给该工作,但没有运气。我也尝试过-D mapred.textoutputformat.separator=-D mapreduce.output.textoutputformat.separator=但没有运气。

我当然搜索过谷歌,但没有发现任何工作。一个搜索结果甚至表明没有可以传递的参数来实现所需的结果(尽管在这种情况下,hadoop 版本真的很旧)。

这是我的代码(添加了换行符以提高可读性):

hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
    -D mapred.output.compress=true -D stream.reduce.output.field.separator=
    -input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
    -mapper 'php my_mapper.php' -reducer 'php my_reducer.php'
4

3 回答 3

10

由于对其他人有帮助,使用上面的提示,我能够进行实施:

CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}

'getRecordWriter' 的内置实现中只有一行更改为:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); 

代替:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t"); 

在将其编译成 Jar 并将其包含到我的 hadoop 流式调用中(通过 hadoop 流式处理的说明)后,调用看起来像:

hadoop   jar  /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar     \
-archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
-libjars theNameOfTheJar.jar \
-outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat  \
-file yourMapper.py    -mapper  yourMapper.py     \
-file yourReducer.py   -reducer yourReducer.py    \
-input $yourInputFile    \
-output $yourOutputDirectoryOnHDFS

我还将 jar 包含在我发出该调用的文件夹中。

它非常适合我的需求(并且它在减速器之后的行尾没有创建标签)。


更新:基于暗示这确实对其他人有帮助的评论,这是我的 CustomOutputFormat.java 文件的完整来源:

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {

    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
        Progressable progress) throws IOException {
    boolean isCompressed = getCompressOutput(job);

    //Channging the default from '\t' to blank
    String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
    if (!isCompressed) {
        Path file = FileOutputFormat.getTaskOutputPath(job, name);
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
            GzipCodec.class);
        // create the named codec
        CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
        // build the filename including the extension
        Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(new DataOutputStream(
            codec.createOutputStream(fileOut)), keyValueSeparator);
    }
    }
}

仅供参考:对于您的使用上下文,请务必检查这不会对映射器和减速器之间的 hadoop-streaming 托管交互(在分离键与值方面)产生不利影响。澄清:

  • 根据我的测试——如果你的数据的每一行都有一个“标签”(每边都有一些东西),你可以保留内置的默认值:流式传输会将第一个标签之前的第一件事解释为你的“关键”,以及它之后的那一行作为你的“价值”。因此,它看不到“空值”,也不会附加显示在减速器之后的选项卡。(您会看到最终输出按“键”的值排序,流在每一行中解释为它看到的每个选项卡之前发生的内容。)

  • 相反,如果您的数据中没有选项卡,并且您没有使用上述技巧覆盖默认值,那么您将在运行完成后看到选项卡,上述覆盖成为修复。

于 2013-08-15T01:09:10.933 回答
8

查看 org.apache.hadoop.mapreduce.lib.output.TextOutputFormat 源代码,我看到了两件事:

  1. write(key,value)如果键或值不为空,该方法将写入分隔符
  2. 始终使用默认值 ( \t) 设置分隔符,当mapred.textoutputformat.separator返回 null (我假设发生-D stream.reduce.output.field.separator=

您唯一的解决方案可能是编写自己的 OutputFormat 来解决这两个问题。

我的测试

在我的任务中,我想重新格式化从

id1|val1|val2|val3
id1|val1

进入:

id1|val1,val2,val3
id2|val1

我有一个自定义映射器(Perl 脚本)来转换行。对于这项任务,我最初尝试将其作为仅键(或仅值)输入,但使用尾随制表符获得了结果。

起初我只是指定:

-D stream.map.input.field.separator='|' -D stream.map.output.field.separator='|'

这给了映射器一个键值对,因为我的映射无论如何都需要一个键。但是这个输出现在在第一个字段之后有标签

添加时我得到了所需的输出:

-D mapred.textoutputformat.separator='|'

如果我没有设置或设置为空白

-D mapred.textoutputformat.separator=

然后我会在第一个字段之后再次获得一个标签。

一旦我查看了 TextOutputFormat 的源代码,这很有意义

于 2013-08-09T17:12:58.357 回答
2

我也有这个问题。我正在使用一个 python,仅限地图的工作,基本上只是发出 CSV 数据行。检查输出后,我注意到每行末尾的 \t 。

 foo,bar,baz\t

我发现映射器和 Python 流都在处理键值对。如果您不发出默认分隔符,则整行 CSV 数据将被视为“键”,并且需要键和值的框架会拍打 \t 和空值。

由于我的数据本质上是一个 CSV 字符串,因此我将流和映射输出的分隔符​​设置为逗号。该框架将第一个逗号之前的所有内容作为键读取,将第一个逗号之后的所有内容作为值读取。然后,当它把结果写到文件中时,它写了关键的逗号值,它有效地创建了我想要的输出。

 foo,bar,baz

就我而言,我添加了以下内容以防止框架将 \t 添加到我的 csv 输出的末尾...

-D mapred.reduce.tasks=0 \
-D stream.map.output.field.separator=, \
-D mapred.textoutputformat.separator=, \
于 2016-07-07T23:21:58.280 回答