1

我对 Spark Streaming 很陌生,我一直在试图弄清楚如何处理这个问题,因为我发现了很多关于单 (K,V) 对的例子,但还有更多。为了找到使用 Spark 与 Java 的转换的最佳方法,我将不胜感激。

让我简单描述一下场景,

目标是获得一组元素在一个时间窗口内的错误率。

给定以下输入,

(A, Error)
(B, Success)
(B, Error)
(B, Success)
(C, Success)
(C, Error)

它将按元素聚合,然后按状态聚合(Element, (Number of Success, Number of Error))。在这种情况下,转换的结果将是,

(A, (0,1))
(B, (2,1))
(C, (1,1))

最后,使用 (i1,i2) -> i1/(i1+i2) 等函数进行比率计算。

(A, 100%)
(B, 33.3%)
(C, 50%)

据我了解,结果将由reduceByKeyAndWindow()函数给出,例如,

JavaPairDStream<String, Double> res = 
pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(1));

按照应用程序的反向流程,我的问题是,

如何在具有多个值或键的 JavaPairDStream 上定义一对(可能类似于JavaPairDStream<String, Tuple2<Integer,Integer>>)?

reduceFunc对于给定的具有多个键的对,哪种方法最好?

哪个是映射初始 DStream 的最佳方式(可能类似于JavaDStream<Tuple2<String, String>> line = input.map(func))?

预先感谢您的帮助。

4

1 回答 1

2

我已经找到了解决方案。使用函数类和元组,可以找到使用 Scala 构建的任何组合。问题是我在 Java 中没有找到任何与此相关的文档或示例。您将在下面找到我的解决方案,以防将来可以帮助任何人。

JavaPairDStream<String,String> samples = lines.flatMapToPair(new PairFlatMapFunction<String,String, String>() {
            public Iterator<Tuple2<String,String>> call(String s) throws Exception {
                return Arrays.asList(new Tuple2<String, String>(//Some logic on my data//).iterator();
            }
        });


JavaPairDStream<Tuple2<String,String>, Integer> samplePairs = samples.mapToPair(
               new PairFunction<Tuple2<String,String>, Tuple2<String,String>, Integer>() {
                    public Tuple2<Tuple2<String,String>, Integer> call(Tuple2<String,String> t) {
                        return new Tuple2<Tuple2<String,String>, Integer>(t, 1);
                    }
                });

        JavaPairDStream<String, Integer> countErrors = samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() {
            public Boolean call(Tuple2<Tuple2<String,String>, Integer> t)
            {
               return (t._1._2.equals("Error"));
           }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() {
            public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) {
                return new Tuple2(t._1._1,t._2);
            }
        }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }}, Durations.seconds(30), Durations.seconds(1));

        JavaPairDStream<String, Integer> countSuccess= samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() {
            public Boolean call(Tuple2<Tuple2<String,String>, Integer> t)
            {
                return (t._1._2.equals("Success"));
            }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() {
            public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) {
                return new Tuple2(t._1._1,t._2);
            }
        }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }}, Durations.seconds(30), Durations.seconds(1));

        JavaPairDStream<String,Tuple2<Optional<Integer>,Optional<Integer>>> countPairs = countSuccess.fullOuterJoin(countErrors);

        JavaPairDStream<String, Double> mappedRDD = countPairs
                .mapToPair(new PairFunction<Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>>, String, Double>() {
                    public Tuple2<String, Double> call(Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>> stringTuple2Tuple2) throws Exception {
                        if ((stringTuple2Tuple2._2()._2().isPresent()) && (stringTuple2Tuple2._2()._1().isPresent())) {
                            return new Tuple2<String, Double>(stringTuple2Tuple2._1(), ((double)stringTuple2Tuple2._2()._1().get() /
                                    ((double)stringTuple2Tuple2._2()._2().get()+(double)stringTuple2Tuple2._2()._1().get())));
                        } else if (stringTuple2Tuple2._2()._2().isPresent()){
                            return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 1.0);
                        } else {
                            return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 0.0);
                        }
                    }
                });
于 2016-11-21T18:42:38.073 回答