我正在使用 Cascading 2 创建 Hadoop 作业,并尝试创建一个从单个源开始的流。在将几个函数应用于数据后,我需要拆分流,以便使用此数据创建两个单独的报告(在两个单独的接收器中)。
//SOURCE
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, input );
//REPORT1 SINK
Scheme report1SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
Tap report1Sink = new Hfs( report1SinkScheme, output1, SinkMode.REPLACE );
//REPORT2 SINK
Scheme report2SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
Tap report2Sink = new Hfs( report2SinkScheme, output2, SinkMode.REPLACE );
//INITIAL FUNCTIONS
Pipe firstPipe = new Pipe("firstPipe");
firstPipe = new Each(firstPipe, new Fields("line"), functionA);
firstPipe = new Each(firstPipe, functionB, Fields.ALL);
//REPORT1 FUNCTION
report1Pipe = new Each(firstPipe, Fields.ALL, function1, Fields.RESULTS);
//REPORT2 FUNCTION
report2Pipe = new Each(firstPipe, Fields.ALL, function2, Fields.RESULTS);
//CONNECT FLOW PARTS
FlowDef flowDef = new FlowDef()
.setName("report-flow")
.addSource(firstPipe, source)
.addSink(report1Pipe, report1Sink)
.addSink(report2Pipe, report2Sink);
new HadoopFlowConnector( properties ).connect( flowDef ).complete();
目前,这给了我错误“java.lang.IllegalArgumentException:无法添加重复的接收器:firstPipe”,但即使在弄乱了一段时间之后,我也遇到了与流程设置有关的各种其他问题。
有人可以解释如何构造这种形式的流(一个源,两个汇)吗?我需要创建一个 Cascade 吗?还是在拆分之前需要一个中间接收器来保存数据?
请帮忙!