我为 Apache Flink 创建了一个简单的 Job,它使用 Gelly 提供的 PageRank 实现。
在本地,在 IDE 中运行,一切都很好。但是,我尝试使用 JobManager Web 界面将我的 Job 的 JAR 提交到在我的机器上运行的 Flink 实例。但是,Flink 并没有为 Job 获取正确的计划并执行 PageRank,而是提出并执行了一个非常奇怪的计划,该计划只计算图的顶点数。
我做了一些研究和调试,发现 Gelly 提供的 PageRank 的实现开始计算图的顶点数,当它没有作为参数提供给算法时:
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
}
这个计算意味着一个嵌入的工作。由于操作员是惰性的,因此不会触发任何计算。在 Flink 服务器中,首先要做的是获取作业计划。这是由一个特殊的环境来完成的OptimizerPlanEnvironment
,它提供了以下result
方法:
public JobExecutionResult execute(String jobName) throws Exception {
Plan plan = createProgramPlan(jobName);
this.optimizerPlan = compiler.compile(plan);
// do not go on with anything now!
throw new ProgramAbortException();
}
问题来自这里。一旦ProgramAbortException
抛出 ,程序就会返回到目前为止计算的计划。但是只计算了内部作业计划,所以这种方式永远不会计算或执行主作业计划。
这是我使用的代码:
public class Job {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Double, Double> graph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
graph.run(new PageRank<Long>(0.85, 10)).print();
}
private static class VertexInit implements MapFunction<Long, Double> {
@Override
public Double map(Long value) throws Exception { return 1.0; }
}
}
如果提供了顶点数,例如graph.run(new PageRank<Long>(0.85, 5, 10))
,没有问题,则正确计算计划并计算PageRank。
我的问题是:我做错了什么?或者这是 Flink 中的一些实际错误?