4

我正在尝试将a 合并到FlinkSeq[DataSet(Long,Long,Double)]中的单个:DataSet[(Long,Long,Double)]

     val neighbors= graph.map(el => zKnn.neighbors(results,
      el.vector, 150, metric)).reduce(
     (a, b) => a.union(b)
      ).collect()

其中 graph 是一个常规的 scala 集合,但可以转换为 DataSet;结果是一个DataSet[Vector]并且不应该被收集并且在邻居方法中是需要的

我总是得到一个 FlinkRuntime Exeption:

目前无法处理超过 64 个输出的节点。org.apache.flink.optimizer.CompilerException:当前无法处理超过 64 个输出的节点。在 org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:347) 在 org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202

4

1 回答 1

4

Flink 目前不支持超过 64 个输入数据集的联合算子。

作为一种解决方法,您可以分层合并多达 64 个数据集,并在层次结构的各个级别之间注入一个身份映射器。就像是:

DataSet level1a = data1.union(data2.union(data3...(data64))).map(new IDMapper());
DataSet level1b = data65.union(data66...(data128))).map(new IDMapper());
DataSet level2 = level1a.union(level1b)
于 2015-07-24T20:57:52.120 回答