输入 1: KV 数据流。
输入 2:一些静态数据分区(用于处理输入 1 中的流)
问题可以建模为下图:与 HDFS/RDD 分区共存:我们如何确保流任务 Map1、Map2 和 Map3 在以下机器上运行HDFS/RDD 分区是否存在?图像描述:假设K
是流式密钥(不是元组)。First Map 将其转换为元组(具有空值)并将其广播到 3 个 Mapper。每个映射器运行在包含不同分区的 RDD(或 HDFS 文件,这是第二个输入和静态数据)的不同节点上。每个 Mapper 使用 RDD 分区来计算键的值。最后,我们要为键聚合值(使用 reduceByKey _+_)。
问问题
736 次
2 回答
1
如果我理解正确:
- 这
K
是RDD
你***DStream
通过streaming
工作得到的。我不知道您传入数据的来源。这个数据基本上是一个array/seq/list
Keys。 - 您提到的静态数据是一种
PairedRDD
形式<K, Object>
。从 中Object
,您要提取Val_n
中的键incoming RDD
。 - 您的目标是避免/最小化
shuffle
这个join
(或查找)过程。
为此,最好的策略是对两个 RDDJoin
使用相同incoming RDD
的. 如果其中一个数据 RDD 比另一个小得多,您可以探索较小的一个。我最近在我的项目中尝试过这个,并在帖子中分享了经验:Random Partitioner behavior on the joined RDDStatic RDD
partitioned
Partitioner
broadcasting
编辑:由于您想处理您的密钥K
(假设 K=Set{K1, K2...Kn}),使用StaticRDD
, 与分区一起使用,我建议采用如下方法。我没有检查语法和正确性,但你会明白的。
val kRddBroadcastVar = .... // broadcasted variable
val keyValRDD = staticRDD.mapPartitions {
iter => transformKRddToTuple2Events(iter, kRddBroadcastVar )
}
def transformKRddToTuple2Events( iter: Iterator[Object], kRddBroadcastVar: List[KeyObjectType] ) : Iterator[(keyObjectType, valueObjectType )] {
val staticList = iter.toList
val toReturn = kRddBroadcastVar.map ( k => getKeyValue(k, staticList) )
toReturn.iterator
}
val outRdd = keyValRDD.reduceByKey( _ + _ )
如果这有意义,请将此答案标记为已接受。
于 2016-03-01T18:19:13.730 回答
1
您的静态 RDD 是否足够小以进行缓存。在这种情况下,Spark 将尝试在这些节点上运行流式传输任务。但它不能保证。
此外,如果参考数据很小,为什么不广播该数据集。
我们一直在尝试解决与我们的数据存储 SnappyData ( http://www.snappydata.io/ ) 中的首选位置有关的类似问题,其中数据位置是一等公民。
于 2016-03-02T06:20:04.683 回答