我已经通过 k-means 为终止条件定义了一个过滤器。如果我运行我的应用程序,它总是只计算一次迭代。
我认为问题出在这里:
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
或者也许是过滤器功能:
public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
private static final long serialVersionUID = 5868635346889117617L;
public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
if(tuple.f0.equals(tuple.f1)) {
return true;
}
else {
return false;
}
}
}
最好的问候,保罗
我的完整代码在这里:
public void run() {
//load properties
Properties pro = new Properties();
FileSystem fs = null;
try {
pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
} catch (Exception e) {
e.printStackTrace();
}
int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input points
DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
DataSet<GeoTimeDataCenter> centroids = null;
try {
centroids = getCentroidDataSet(env);
} catch (Exception e1) {
e1.printStackTrace();
}
// set number of bulk iterations for KMeans algorithm
IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
DataSet<GeoTimeDataCenter> newCentroids = points
// compute closest centroid for each point
.map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
// count and sum point coordinates for each centroid
.groupBy(0).reduceGroup(new CentroidAccumulator())
// compute new centroids from point counts and coordinate sums
.map(new CentroidAverager(this.getBenchmarkCounter()));
// feed new centroids back into next iteration with termination condition
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
// emit result
clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
finalCentroids.writeAsText(outputPath+"/centers");//print();
// execute program
try {
env.execute("KMeans Flink");
} catch (Exception e) {
e.printStackTrace();
}
}
public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
private static final long serialVersionUID = 5868635346889117617L;
public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
if(tuple.f0.equals(tuple.f1)) {
return true;
}
else {
return false;
}
}
}