2

我有一个适用于JavaDStreams对象的应用程序。这是一段代码,我在其中计算单词出现的频率。

JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });

现在,如果我希望打印按整数值排序的前 N ​​个频繁元素,如果没有类似sortByKey的方法(用于 JavaPairRDD),我该怎么做?

4

2 回答 2

3

由于您已经JavaPairDStream<String, Integer>并且想要按整数值排序,因此您必须先交换对。

JavaPairDStream<Integer,String> swappedPair = wordCounts.mapToPair(x -> x.swap());

transformToPair现在您可以通过使用和使用sortByKey功能进行排序。

JavaPairDStream<Integer,String> sortedStream = swappedPair.transformToPair(
     new Function<JavaPairRDD<Integer,String>, JavaPairRDD<Integer,String>>() {
         @Override
         public JavaPairRDD<Integer,String> call(JavaPairRDD<Integer,String> jPairRDD) throws Exception {
                    return jPairRDD.sortByKey(false);
                  }
              });

sortedStream.print();
于 2017-06-11T06:09:16.933 回答
0

简化:

  JavaPairDStream<String, Long> counts = lines.countByValue();
  JavaPairDStream<Long,String> swappedPair = counts.mapToPair(Tuple2::swap);  
  JavaPairDStream<Long,String> sortedStream = swappedPair.transformToPair(s -> s.sortByKey(false));
于 2017-08-28T06:52:27.287 回答