2

输入 1: KV 数据流。
输入 2:一些静态数据分区(用于处理输入 1 中的流)
问题可以建模为下图:与 HDFS/RDD 分区共存:我们如何确保流任务 Map1、Map2 和 Map3 在以下机器上运行HDFS/RDD 分区是否存在?图像描述:假设K
在此处输入图像描述


是流式密钥(不是元组)。First Map 将其转换为元组(具有空值)并将其广播到 3 个 Mapper。每个映射器运行在包含不同分区的 RDD(或 HDFS 文件,这是第二个输入和静态数据)的不同节点上。每个 Mapper 使用 RDD 分区来计算键的值。最后,我们要为键聚合值(使用 reduceByKey _+_)。

4

2 回答 2

1

如果我理解正确:

  1. KRDD***DStream通过streaming工作得到的。我不知道您传入数据的来源。这个数据基本上是一个array/seq/listKeys。
  2. 您提到的静态数据是一种PairedRDD形式<K, Object>。从 中Object,您要提取Val_n中的键incoming RDD
  3. 您的目标是避免/最小化shuffle这个join(或查找)过程。

为此,最好的策略是对两个 RDDJoin使用相同incoming RDD的. 如果其中一个数据 RDD 比另一个小得多,您可以探索较小的一个。我最近在我的项目中尝试过这个,并在帖子中分享了经验:Random Partitioner behavior on the joined RDDStatic RDDpartitionedPartitionerbroadcasting

编辑:由于您想处理您的密钥K(假设 K=Set{K1, K2...Kn}),使用StaticRDD, 与分区一起使用,我建议采用如下方法。我没有检查语法和正确性,但你会明白的。

val kRddBroadcastVar = .... // broadcasted variable 
val keyValRDD = staticRDD.mapPartitions {   
       iter => transformKRddToTuple2Events(iter, kRddBroadcastVar )
     }

def transformKRddToTuple2Events( iter: Iterator[Object], kRddBroadcastVar: List[KeyObjectType] ) : Iterator[(keyObjectType, valueObjectType )] {    
     val staticList = iter.toList
     val toReturn   = kRddBroadcastVar.map ( k => getKeyValue(k, staticList) )    
     toReturn.iterator
}

val outRdd = keyValRDD.reduceByKey( _ + _ )

如果这有意义,请将此答案标记为已接受。

于 2016-03-01T18:19:13.730 回答
1

您的静态 RDD 是否足够小以进行缓存。在这种情况下,Spark 将尝试在这些节点上运行流式传输任务。但它不能保证。

此外,如果参考数据很小,为什么不广播该数据集。

我们一直在尝试解决与我们的数据存储 SnappyData ( http://www.snappydata.io/ ) 中的首选位置有关的类似问题,其中数据位置是一等公民。

于 2016-03-02T06:20:04.683 回答