0

开始在 Amazon EMR 上使用 Cascading,已经设法让它运行,但遇到了一个相当简单的障碍,我希望有人能对此有所了解。

我的代码:

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Each;
import cascading.tap.SinkMode;

public class Main
  {
  public static void
  main( String[] args )
    {
    String inPath = args[ 0 ];
    String outPath = args[ 1 ];

    Properties properties = new Properties();
    AppProps.setApplicationJarClass( properties, Main.class );
    HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

    // create the source tap
    TextLine sourceScheme = new TextLine(new Fields("line"));
    Tap inTap = new Hfs( sourceScheme, inPath );

    // create the sink tap
    TextLine sinkScheme = new TextLine( new Fields("custid", "movieids"));
    Tap outTap = new Hfs( sinkScheme, outPath, SinkMode.REPLACE );

    Fields filmFields = new Fields("custid", "movieids");

    String filmRegex = "([0-9]:*[,.]*)";

    RegexParser parser = new RegexParser(filmFields, filmRegex);

    Pipe importPipe = new Each("import", new Fields("line"), parser, Fields.RESULTS );

    // connect the taps, pipes, etc., into a flow
    Flow parsedFlow = new HadoopFlowConnector(properties).connect(inTap, outTap, importPipe);

    // run the flow
    parsedFlow.start();
    parsedFlow.complete();
    }
  }

我的输入(没有空行):

1:2

2:4

5:1

3:9

我的输出:

Task TASKID="task_201305241444_0003_m_000000" TASK_TYPE="MAP" TASK_STATUS="FAILED" FINISH_TIME="1369408133954" ERROR="cascading\.tuple\.TupleException: operation added the wrong number of fields, expected: ['custid', 'movieids'], got result size: 1
    at cascading\.tuple\.TupleEntryCollector\.add(TupleEntryCollector\.java:82)
    at cascading\.operation\.regex\.RegexParser\.onFoundGroups(RegexParser\.java:168)
    at cascading\.operation\.regex\.RegexParser\.operate(RegexParser\.java:151)
    at cascading\.flow\.stream\.FunctionEachStage\.receive(FunctionEachStage\.java:99)
    at cascading\.flow\.stream\.FunctionEachStage\.receive(FunctionEachStage\.java:39)
    at cascading\.flow\.stream\.SourceStage\.map(SourceStage\.java:102)
    at cascading\.flow\.stream\.SourceStage\.run(SourceStage\.java:58)
    at cascading\.flow\.hadoop\.FlowMapper\.run(FlowMapper\.java:127)
    at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:441)
    at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:377)
    at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:255)
    at java\.security\.AccessController\.doPrivileged(Native Method)
    at javax\.security\.auth\.Subject\.doAs(Subject\.java:396)
    at org\.apache\.hadoop\.security\.UserGroupInformation\.doAs(UserGroupInformation\.java:1132)
    at org\.apache\.hadoop\.mapred\.Child\.main(Child\.java:249)

reg ex 可以在http://regexpal.com/

非常感谢

邓肯

4

1 回答 1

1

你得到一个例外,因为你的正则表达式产生一个结果,其中两个结果字段被排除(即“custid”和“movieids”),因为正则表达式只包含一个组(...)

如果您只想在冒号处拆分,请使用包含 2 个组的表达式,例如:

String filmRegex = "(\\d):(\\d)";

\d+,如果您的数字可以有多个数字。

TextDelimited或者,更简单的是,在使用输入方案从文件中读取时,自动将输入数据拆分为其字段:

Scheme sourceScheme = new TextDelimited(new Fields("custid", "movieids"), ":");
于 2013-05-24T19:59:26.553 回答