2

如何使用 Apache Apex 创建批处理应用程序?

我发现的所有示例都是流式应用程序,这意味着它们不会结束,我希望我的应用程序在处理完所有数据后关闭。

谢谢

4

2 回答 2

4

你的用例是什么?原生支持批处理已在路线图上,目前正在开发中。

或者,在此之前,一旦您确定您的处理已完成,输入运算符可以发送一个信号作为 ShutdownException(),该信号将通过 DAG 传播并关闭 DAG。

如果您需要更多详细信息,请告诉我们。

于 2016-11-28T14:56:12.410 回答
3

您可以在运行应用程序之前添加退出条件。例如

public void testMapOperator() throws Exception
{
   LocalMode lma = LocalMode.newInstance();
   DAG dag = lma.getDAG();

   NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
   FunctionOperator.MapFunctionOperator<Integer, Integer> mapper
    = dag.addOperator("mapper", new  FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square()));
   ResultCollector collector = dag.addOperator("collector", new ResultCollector());

   dag.addStream("raw numbers", numGen.output, mapper.input);
   dag.addStream("mapped results", mapper.output, collector.input);

// Create local cluster
   LocalMode.Controller lc = lma.getController();
   lc.setHeartbeatMonitoringEnabled(false);

 //Condition to exit the application
  ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
  {
     @Override
     public Boolean call() throws Exception
    {
       return TupleCount == NumTuples;
    }
  });

  lc.run();

  Assert.assertEquals(sum, 285);
}

有关完整代码,请参阅https://github.com/apache/apex-malhar/blob/master/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java

于 2016-11-28T12:36:43.173 回答