0

我有一个 flink DataSet(从文件中读取),其中包含来自许多不同传感器的传感器读数。我使用 flinks groupBy() 方法将数据组织为每个传感器的 UnsortedGrouping。接下来,我想以分布式方式在我的 DataSet 中的每个 UnsortedGrouping 上运行 KMeans 算法。

我的问题是,如何使用 flink 有效地实现这个功能。以下是我当前的实现:我编写了自己的 groupReduce() 方法,该方法将 flink KMeans 算法应用于每个 UnsortedGrouping。这段代码有效,但看起来很慢并且使用大量内存。

我认为这与我必须做的数据重组量有关。必须执行多次数据转换才能使代码运行,因为我不知道如何更有效地做到这一点:

  • UnsortedGrouping 到 Iterable(groupReduce() 方法的开始)
  • 可迭代到 LinkedList(需要这个才能使用 fromCollection() 方法)
  • LinkedList 到 DataSet(需要作为 KMeans 的输入)
  • 生成的 KMeans 数据集到 LinkedList(能够迭代收集器)

当然,必须有一种更有效、更高效的方法来实现这一点吗?谁能告诉我如何以一种干净有效的 flink 方式实现它?

// *************************************************************************
// VARIABLES
// *************************************************************************

static int numberClusters = 10;
static int maxIterations = 10;
static int sensorCount = 117;
static ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// *************************************************************************
// PROGRAM
// *************************************************************************

public static void main(String[] args) throws Exception {

    final long startTime = System.currentTimeMillis();

    String fileName = "C:/tmp/data.nt";
    DataSet<String> text = env.readTextFile(fileName);

    // filter relevant DataSet from text file input
    UnsortedGrouping<Tuple2<Integer,Point>> points = text
            .filter(x -> x.contains("Value") && x.contains("valueLiteral")).filter(x -> !x.contains("#string"))
            .map(x -> new Tuple2<Integer, Point>(
                    Integer.parseInt(x.substring(x.indexOf("_") + 1, x.indexOf(">"))) % sensorCount,
                    new Point(Double.parseDouble(x.split("\"")[1]))))
            .filter(x -> x.f0 < 10)
            .groupBy(0);

    DataSet<Tuple2<Integer, Point>> output = points.reduceGroup(new DistinctReduce());
    output.print();

    // print the execution time
    final long endTime = System.currentTimeMillis();
    System.out.println("Total execution time: " + (endTime - startTime) + "ms");
}

public static class DistinctReduce implements GroupReduceFunction<Tuple2<Integer, Point>, Tuple2<Integer, Point>> {

    private static final long serialVersionUID = 1L;

    @Override public void reduce(Iterable<Tuple2<Integer, Point>> in, Collector<Tuple2<Integer, Point>> out) throws Exception {

        AtomicInteger counter = new AtomicInteger(0);
        List<Point> pointsList = new LinkedList<Point>();

        for (Tuple2<Integer, Point> t : in) {
            pointsList.add(new Point(t.f1.x));
        }
        DataSet<Point> points = env.fromCollection(pointsList);

        DataSet<Centroid> centroids = points
                .distinct()
                .first(numberClusters)
                .map(x -> new Centroid(counter.incrementAndGet(), x));
        //DataSet<String> test = centroids.map(x -> String.format("Centroid %s", x)); //test.print();

        IterativeDataSet<Centroid> loop = centroids.iterate(maxIterations); 
        DataSet<Centroid> newCentroids = points // compute closest centroid for each point
                .map(new SelectNearestCenter()).withBroadcastSet(loop,"centroids") // count and sum point coordinates for each centroid
                .map(new CountAppender())
                .groupBy(0)
                .reduce(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums
                .map(new CentroidAverager());

        // feed new centroids back into next iteration
        DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);

        DataSet<Tuple2<Integer, Point>> clusteredPoints = points // assign points to final clusters
                .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");

        // emit result System.out.println("Results from the KMeans algorithm:");
        clusteredPoints.print();

        // emit all unique strings.
        List<Tuple2<Integer, Point>> clusteredPointsList = clusteredPoints.collect();
        for(Tuple2<Integer, Point> t : clusteredPointsList) {
            out.collect(t);
        }
    }
}
4

1 回答 1

0

您必须首先对数据点和质心进行分组。然后迭代质心并将它们与数据点共同分组。对于组中的每个点,您将其分配给最近的质心。然后对初始组索引和质心索引进行分组,以减少分配给同一质心的所有点。这将是一次迭代的结果。

代码可能如下所示:

DataSet<Tuple2<Integer, Point>> groupedPoints = ...

DataSet<Tuple2<Integer, Centroid>> groupCentroids = ...

IterativeDataSet<Tuple2<Integer, Centroid>> groupLoop = groupCentroids.iterate(10);

DataSet<Tuple2<Integer, Centroid>> newGroupCentroids = groupLoop
    .coGroup(groupedPoints).where(0).equalTo(0).with(new CoGroupFunction<Tuple2<Integer,Centroid>, Tuple2<Integer,Point>, Tuple4<Integer, Integer, Point, Integer>>() {
    @Override
    public void coGroup(Iterable<Tuple2<Integer, Centroid>> centroidsIterable, Iterable<Tuple2<Integer, Point>> points, Collector<Tuple4<Integer, Integer, Point, Integer>> out) throws Exception {
        // cache centroids
        List<Tuple2<Integer, Centroid>> centroids = new ArrayList<>();
        Iterator<Tuple2<Integer, Centroid>> centroidIterator = centroidsIterable.iterator();

        for (Tuple2<Integer, Point> pointTuple : points) {
            double minDistance = Double.MAX_VALUE;
            int minIndex = -1;
            Point point = pointTuple.f1;

            while (centroidIterator.hasNext()) {
                centroids.add(centroidIterator.next());
            }

            for (Tuple2<Integer, Centroid> centroidTuple : centroids) {
                Centroid centroid = centroidTuple.f1;
                double distance = point.euclideanDistance(centroid);

                if (distance < minDistance) {
                    minDistance = distance;
                    minIndex = centroid.id;
                }
            }

            out.collect(Tuple4.of(minIndex, pointTuple.f0, point, 1));
        }
    }})
    .groupBy(0, 1).reduce(new ReduceFunction<Tuple4<Integer, Integer, Point, Integer>>() {
        @Override
        public Tuple4<Integer, Integer, Point, Integer> reduce(Tuple4<Integer, Integer, Point, Integer> value1, Tuple4<Integer, Integer, Point, Integer> value2) throws Exception {
            return Tuple4.of(value1.f0, value1.f1, value1.f2.add(value2.f2), value1.f3 + value2.f3);
        }
    }).map(new MapFunction<Tuple4<Integer,Integer,Point,Integer>, Tuple2<Integer, Centroid>>() {
        @Override
        public Tuple2<Integer, Centroid> map(Tuple4<Integer, Integer, Point, Integer> value) throws Exception {
            return Tuple2.of(value.f1, new Centroid(value.f0, value.f2.div(value.f3)));
        }
    });

DataSet<Tuple2<Integer, Centroid>> result = groupLoop.closeWith(newGroupCentroids);
于 2017-02-03T10:46:22.940 回答