4

我是 Spark 新手,我想使用 spark 流和 spark 批处理实现 lambda 架构。

在网上看了一下,找到了以下文章:

http://blog.cloudera.com/blog/2014/08/building-lambda-architecture-with-spark-streaming/

这对我的一些分析来说很好,但我认为这个解决方案在必须找到不同元素的情况下是不可行的。

如果要在 JavaRDD 上查找不同的元素,可以使用 distinct 方法。DStreams 是一组 RDD,所以如果你申请

transform((rdd) -> rdd.distinct()) 

在 Dstream 上的方法,您将在流的每个 rdd 上执行 distinct,因此您将在每个 RDD 中找到不同的元素,而不是在整个 DStream 上。

可能这样写有点混乱,所以让我用一个例子来澄清一下:

我有以下元素:

Apple
Pear
Banana
Peach
Apple
Pear

在批处理应用程序中:

JavaRDD<String> elemsRDD=sc.textFile(exFilePath).distinct() 

子 RDD 将包含:

Apple
Pear
Banana
Peach

如果我理解正确,这应该是流的行为:

假设我们有一个 1s 的批处理时间和一个 2s 的窗口:

第一个RDD:

Apple  
Pear
Banana

第二个RDD:

Peach
Apple
Pear

JavaDStream<String> elemsStream=(getting from whathever source)
childStream = elemsStream.transform((rdd) -> rdd.distinct())
childStream.forEachRDD...

最终会得到 2 个 Rdds:首先:

Apple  
Pear
Banana

第二:

Peach
Apple
Pear

这是对 RDD 的独特尊重,而不是对 DStream 的尊重。

我对 Streaming 部分的解决方案如下:

JavaDStream<HashSet<String>> distinctElems = elemsStream.map(
                (elem) -> {
                    HashSet<String> htSet = new HashSet<String>();
                    htSet.add(elem);
                    return htSet;
                }).reduce((sp1, sp2) -> {
                    sp1.addAll(sp2);
                    return sp1;
                });

这样的结果是:

Apple
Pear
Banana
Peach

作为批处理模式。但是,此解决方案将需要维护开销,并且存在因重复代码库而导致错误的风险。

有没有更好的方法来尽可能多地重用批处理模式的代码来达到相同的结果?

提前致谢。

4

1 回答 1

1

您的解决方案很优雅。

我有其他解决方案,它不如你的优雅,但我不知道它是否更有效。这是我基于 mapToPairFunction 的解决方案

JavaPairDStream<String, Integer> distinctElems = elemsStream
       .mapToPair(event -> new Tuple2<String, Integer>(event,1));
distinctElems = distinctElems.reduceByKey((t1, t2) -> t1);

我认为这更有效,但我无法测试它。

于 2015-02-27T10:17:18.263 回答