4

我正在使用 Cascading 2 创建 Hadoop 作业,并尝试创建一个从单个源开始的流。在将几个函数应用于数据后,我需要拆分流,以便使用此数据创建两个单独的报告(在两个单独的接收器中)。

    //SOURCE
    Scheme sourceScheme = new TextLine( new Fields( "line" ) );
    Tap source = new Hfs( sourceScheme, input );

    //REPORT1 SINK
    Scheme report1SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
    Tap report1Sink = new Hfs( report1SinkScheme, output1, SinkMode.REPLACE );

    //REPORT2 SINK
    Scheme report2SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
    Tap report2Sink = new Hfs( report2SinkScheme, output2, SinkMode.REPLACE );

    //INITIAL FUNCTIONS
    Pipe firstPipe = new Pipe("firstPipe");
    firstPipe = new Each(firstPipe, new Fields("line"), functionA);
    firstPipe = new Each(firstPipe, functionB, Fields.ALL);

    //REPORT1 FUNCTION
    report1Pipe = new Each(firstPipe, Fields.ALL, function1, Fields.RESULTS);

    //REPORT2 FUNCTION
    report2Pipe = new Each(firstPipe, Fields.ALL, function2, Fields.RESULTS);

    //CONNECT FLOW PARTS
    FlowDef flowDef = new FlowDef()
    .setName("report-flow")
    .addSource(firstPipe, source)
    .addSink(report1Pipe, report1Sink)
    .addSink(report2Pipe, report2Sink);

    new HadoopFlowConnector( properties ).connect( flowDef ).complete();

目前,这给了我错误“java.lang.IllegalArgumentException:无法添加重复的接收器:firstPipe”,但即使在弄乱了一段时间之后,我也遇到了与流程设置有关的各种其他问题。

有人可以解释如何构造这种形式的流(一个源,两个汇)吗?我需要创建一个 Cascade 吗?还是在拆分之前需要一个中间接收器来保存数据?

请帮忙!

4

2 回答 2

5

您可以使用 Cascading 文档中提到的拆分模式。这是一个例子:

public static void main(String[] args) {
    // source and sink
    Scheme sourceScheme = new TextLine(new Fields("line"));
    Tap source = new FileTap(sourceScheme, args[0]);

    Fields sinkFields = new Fields("word", "count");
    Scheme sinkScheme = new TextLine(sinkFields, sinkFields);
    Tap sink_one = new FileTap(sinkScheme, "out-one.txt");
    Tap sink_two = new FileTap(sinkScheme, "out-two.txt");

    // the pipe assembly
    Pipe assembly = new Pipe("wordcount");

    String regex = "\\w+";
    Function function = new RegexGenerator(new Fields("word"), regex);
    assembly = new Each(assembly, new Fields("line"), function);

    Aggregator count = new Count(new Fields("count"));

    // ...split into two pipes
    Pipe countOne = new Pipe("count-one", assembly);
    countOne = new GroupBy(countOne, new Fields("word"));
    countOne = new Every(countOne, count);

    Pipe countTwo = new Pipe("count-two", assembly);
    countTwo = new GroupBy(countTwo, new Fields("word"));
    countTwo = new Every(countTwo, count);

    // create the flow
    final List<Pipe> pipes = new ArrayList<Pipe>(2);
    pipes.add(countOne);
    pipes.add(countTwo);

    final Map<String, Tap> sinks = new HashMap<String, Tap>();
    sinks.put("count-one", sink_one);
    sinks.put("count-two", sink_two);

    FlowConnector flowConnector = new LocalFlowConnector();
    Flow flow = flowConnector.connect(source, sinks, pipes);

    flow.complete();
}
于 2012-09-04T00:58:05.290 回答
4

拆分模式位于级联用户指南中: http ://docs.cascading.org/cascading/2.1/userguide/htmlsingle/#N21362

另一个(更简单的)示例包含在“不耐烦的级联”中,第 5 部分和第 6 部分:

关于上面显示的代码的一点是,它似乎缺少 和 的变量report1Pipe定义report2Pipe。要使用拆分模式,每个分支都需要一个名称,并且名称需要不同。

抛出异常是因为有两个分支都从管道组件的早期继承了相同的名称。因此,例如,这些flowDef.addSink(..)调用对于流程规划者来说是模棱两可的。

所以在“不耐烦”的第 5 部分中,看看“D”、“DF”和“TF”分支是如何在操作中命名的。

Cascading 要求这种命名似乎有点违反直觉,但当您将故障陷阱调试等附加到特定分支时,它在大型、复杂的工作流程中变得非常重要。

或者,Clojure 中的 Cascalog DSL 更具声明性,因此这由语言直接处理——分支是子查询,陷阱等在子查询的闭包中处理。

于 2013-01-03T05:44:40.587 回答